clientTmq.c 89.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
#define OFFSET_IS_RESET_OFFSET(_of)  ((_of) < 0)

32 33
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
34
struct SMqMgmt {
35 36 37
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
38
};
L
Liu Jicong 已提交
39

X
Xiaoyu Wang 已提交
40 41
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
42
static struct SMqMgmt tmqMgmt = {0};
43

L
Liu Jicong 已提交
44 45 46 47 48 49
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
50 51 52
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
53 54
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
55
struct tmq_list_t {
L
Liu Jicong 已提交
56
  SArray container;
L
Liu Jicong 已提交
57
};
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59
struct tmq_conf_t {
60 61 62 63 64 65 66 67
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
68 69 70 71 72
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
73
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
74
  void*          commitCbUserParam;
L
Liu Jicong 已提交
75 76 77
};

struct tmq_t {
78 79 80 81 82 83 84 85 86 87
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
  int32_t        resetOffsetCfg;
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
88 89
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
90 91

  // status
wmmhello's avatar
wmmhello 已提交
92
  SRWLatch        lock;
L
Liu Jicong 已提交
93 94
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
95 96
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
97
  int32_t epSkipCnt;
L
Liu Jicong 已提交
98
#endif
99
  // poll info
X
Xiaoyu Wang 已提交
100 101
  int64_t pollCnt;
  int64_t totalRows;
L
Liu Jicong 已提交
102

L
Liu Jicong 已提交
103
  // timer
X
Xiaoyu Wang 已提交
104 105 106 107 108 109 110 111 112 113
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
114 115
};

116 117
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
118
  tsem_t  sem;
119 120
} SAskEpInfo;

X
Xiaoyu Wang 已提交
121 122 123 124 125 126 127 128
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
129
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
130
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
131 132
};

L
Liu Jicong 已提交
133
enum {
134
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
135 136 137 138
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
139
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
H
Haojun Liao 已提交
142 143 144 145 146 147 148 149 150 151
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
H
Haojun Liao 已提交
152 153 154 155
  int32_t       vgSkipCnt;              // here used to mark the slow vgroups
  bool          receivedInfoFromVnode;  // has already received info from vnode
  int64_t       emptyBlockReceiveTs;    // once empty block is received, idle for ignoreCnt then start to poll data
  bool          seekUpdated;            // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
156
  SEpSet        epSet;
157 158
} SMqClientVg;

L
Liu Jicong 已提交
159
typedef struct {
160 161 162
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
163
  SSchemaWrapper schema;
164 165
} SMqClientTopic;

L
Liu Jicong 已提交
166 167
typedef struct {
  int8_t          tmqRspType;
168
  int32_t         epoch;  // epoch can be used to guard the vgHandle
169
  int32_t         vgId;
wmmhello's avatar
wmmhello 已提交
170
  char            topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
171 172
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
173
  uint64_t        reqId;
174
  SEpSet*         pEpset;
L
Liu Jicong 已提交
175
  union {
L
Liu Jicong 已提交
176 177
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
178
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
179
  };
L
Liu Jicong 已提交
180 181
} SMqPollRspWrapper;

L
Liu Jicong 已提交
182
typedef struct {
wmmhello's avatar
wmmhello 已提交
183 184
//  int64_t refId;
//  int32_t epoch;
L
Liu Jicong 已提交
185 186
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
187
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
188

L
Liu Jicong 已提交
189
typedef struct {
190 191 192 193
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
194 195
} SMqAskEpCbParam;

L
Liu Jicong 已提交
196
typedef struct {
197 198
  int64_t         refId;
  int32_t         epoch;
wmmhello's avatar
wmmhello 已提交
199 200 201
  char            topicName[TSDB_TOPIC_FNAME_LEN];
//  SMqClientVg*    pVg;
//  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
202
  int32_t         vgId;
X
Xiaoyu Wang 已提交
203
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
204
} SMqPollCbParam;
205

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

223
typedef struct {
224 225
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
226 227
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
228
  int32_t        code;
229
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
230 231
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
232
  void* userParam;
233 234 235 236
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
237
  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
238 239 240
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
241
} SMqCommitCbParam;
242

243 244 245 246 247
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

248
static int32_t doAskEp(tmq_t* tmq);
249 250
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
251
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
252
                               int32_t index, int32_t totalVgroups, int32_t type);
X
Xiaoyu Wang 已提交
253 254 255
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
256

257
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
258
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
259 260 261 262 263
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

264
  conf->withTbName = false;
L
Liu Jicong 已提交
265
  conf->autoCommit = true;
266
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
267
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
268
  conf->hbBgEnable = true;
269

270 271 272
  return conf;
}

L
Liu Jicong 已提交
273
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
274
  if (conf) {
275 276 277 278 279 280 281 282 283
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
284 285
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
286 287 288
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
289
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
290
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
291
    return TMQ_CONF_OK;
292
  }
L
Liu Jicong 已提交
293

294
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
295
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
296 297
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
298

299 300
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
301
      conf->autoCommit = true;
L
Liu Jicong 已提交
302
      return TMQ_CONF_OK;
303
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
304
      conf->autoCommit = false;
L
Liu Jicong 已提交
305 306 307 308
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
309
  }
L
Liu Jicong 已提交
310

311
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
312
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
313 314 315
    return TMQ_CONF_OK;
  }

316 317 318
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
319
      return TMQ_CONF_OK;
320 321
    } else if (strcasecmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
L
Liu Jicong 已提交
322
      return TMQ_CONF_OK;
323 324
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
325 326 327 328 329
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
330

331 332
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
333
      conf->withTbName = true;
L
Liu Jicong 已提交
334
      return TMQ_CONF_OK;
335
    } else if (strcasecmp(value, "false") == 0) {
336
      conf->withTbName = false;
L
Liu Jicong 已提交
337
      return TMQ_CONF_OK;
338 339 340 341 342
    } else {
      return TMQ_CONF_INVALID;
    }
  }

343 344
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
345
      conf->snapEnable = true;
346
      return TMQ_CONF_OK;
347
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
348
      conf->snapEnable = false;
349 350 351 352 353 354
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

355
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
356
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
357 358 359
    return TMQ_CONF_OK;
  }

360
  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
361 362 363 364 365 366 367 368 369 370
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
    tscError("the default value of enable.heartbeat.background is true, can not be seted");
    return TMQ_CONF_INVALID;
    //    }
L
Liu Jicong 已提交
371 372
  }

373
  if (strcasecmp(key, "td.connect.ip") == 0) {
374
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
375 376
    return TMQ_CONF_OK;
  }
377

378
  if (strcasecmp(key, "td.connect.user") == 0) {
379
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
380 381
    return TMQ_CONF_OK;
  }
382

383
  if (strcasecmp(key, "td.connect.pass") == 0) {
384
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
385 386
    return TMQ_CONF_OK;
  }
387

388
  if (strcasecmp(key, "td.connect.port") == 0) {
389
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
390 391
    return TMQ_CONF_OK;
  }
392

393
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
394 395 396
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
397
  return TMQ_CONF_UNKNOWN;
398 399
}

X
Xiaoyu Wang 已提交
400
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
401

L
Liu Jicong 已提交
402 403
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
404
  if (src == NULL || src[0] == 0) return -1;
405
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
406
  if (taosArrayPush(container, &topic) == NULL) return -1;
407 408 409
  return 0;
}

L
Liu Jicong 已提交
410
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
411
  SArray* container = &list->container;
L
Liu Jicong 已提交
412
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
413 414
}

L
Liu Jicong 已提交
415 416 417 418 419 420 421 422 423 424
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

X
Xiaoyu Wang 已提交
425 426
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
                                  int32_t* numOfVgroups) {
427 428 429 430
  int32_t numOfTopics = taosArrayGetSize(pTopicList);
  *index = -1;
  *numOfVgroups = 0;

X
Xiaoyu Wang 已提交
431
  for (int32_t i = 0; i < numOfTopics; ++i) {
432 433
    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
    if (strcmp(pTopic->topicName, pName) != 0) {
434 435 436
      continue;
    }

437 438
    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
439
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
440 441 442
      if (pClientVg->vgId == vgId) {
        *index = j;
        return pClientVg;
443 444
      }
    }
L
Liu Jicong 已提交
445
  }
446 447

  return NULL;
L
Liu Jicong 已提交
448
}
449

450 451 452
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
H
Haojun Liao 已提交
453
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
454
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
455
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
456

X
Xiaoyu Wang 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470
  //  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
  //    taosThreadMutexLock(&pParam->pTmq->lock);
  //    int32_t numOfVgroups, index;
  //    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
  //    &numOfVgroups); if (pVg == NULL) {
  //      tscDebug("consumer:0x%" PRIx64
  //               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
  //               ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
  //               index + 1, numOfVgroups);
  //    } else { // let's retry the commit
  //      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
  //      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
  //        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
  //                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
471
  //                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
X
Xiaoyu Wang 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
  //                 tstrerror(terrno), index + 1, numOfVgroups);
  //      }
  //    }
  //
  //    taosThreadMutexUnlock(&pParam->pTmq->lock);
  //
  //    taosMemoryFree(pParam->pOffset);
  //    taosMemoryFree(pBuf->pData);
  //    taosMemoryFree(pBuf->pEpSet);
  //
  //    commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
  //    return 0;
  //  }
  //
  //  // todo replace the pTmq with refId
487

L
Liu Jicong 已提交
488
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
489
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
490
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
491

492
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
493 494 495
  return 0;
}

496
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
497
                               int32_t index, int32_t totalVgroups, int32_t type) {
498
  SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
L
Liu Jicong 已提交
499
  if (pOffset == NULL) {
500
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
501
  }
502

503 504
  pOffset->consumerId = tmq->consumerId;
  pOffset->offset.val = pVg->offsetInfo.currentOffset;
505

L
Liu Jicong 已提交
506
  int32_t groupLen = strlen(tmq->groupId);
507 508 509
  memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
  pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
510

511 512
  int32_t len = 0;
  int32_t code = 0;
513
  tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
L
Liu Jicong 已提交
514
  if (code < 0) {
515
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
516
  }
517

L
Liu Jicong 已提交
518
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
519 520
  if (buf == NULL) {
    taosMemoryFree(pOffset);
521
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
522
  }
523

L
Liu Jicong 已提交
524
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
525

L
Liu Jicong 已提交
526 527 528 529
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
530
  tEncodeMqVgOffset(&encoder, pOffset);
L
Liu Jicong 已提交
531
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
532 533

  // build param
534
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
535
  if (pParam == NULL) {
L
Liu Jicong 已提交
536
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
537
    taosMemoryFree(buf);
538
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
539
  }
540

L
Liu Jicong 已提交
541 542
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
543 544 545
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
546
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
547 548 549 550

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
551
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
552 553
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
554
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
555
  }
556

557
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
558 559 560 561

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
562
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
563
  pMsgSendInfo->fp = tmqCommitCb;
564
  pMsgSendInfo->msgType = type;
L
Liu Jicong 已提交
565

L
Liu Jicong 已提交
566 567 568
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

H
Haojun Liao 已提交
569
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
570
  char offsetBuf[80] = {0};
571
  tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
572 573

  char commitBuf[80] = {0};
H
Haojun Liao 已提交
574
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
575
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
576
           tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
577
           totalVgroups, pMsgSendInfo->requestId);
H
Haojun Liao 已提交
578

L
Liu Jicong 已提交
579 580
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
581 582

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
583 584
}

H
Haojun Liao 已提交
585 586 587 588 589 590 591 592 593 594 595
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
596
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
597 598 599
  return NULL;
}

600
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
601 602 603 604 605 606 607 608 609 610 611 612
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
    return;
  }

  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
L
Liu Jicong 已提交
613
    vgId = pRspObj->vgId;
614 615 616
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
L
Liu Jicong 已提交
617
    vgId = pMetaRspObj->vgId;
618 619 620
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
621
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
622
  } else {
623 624
    pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
    return;
L
Liu Jicong 已提交
625 626 627 628
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
629 630
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
L
Liu Jicong 已提交
631
  }
H
Haojun Liao 已提交
632

633 634
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
635
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
636
  pParamSet->userParam = userParam;
L
Liu Jicong 已提交
637

H
Haojun Liao 已提交
638 639
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);

640 641
  tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);

H
Haojun Liao 已提交
642 643
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
X
Xiaoyu Wang 已提交
644 645
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
            pTopicName, numOfTopics);
646 647 648 649
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
  }
L
Liu Jicong 已提交
650

651 652 653 654 655 656
  int32_t j = 0;
  int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
  for (j = 0; j < numOfVgroups; j++) {
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
    if (pVg->vgId == vgId) {
      break;
L
Liu Jicong 已提交
657
    }
L
Liu Jicong 已提交
658
  }
L
Liu Jicong 已提交
659

660
  if (j == numOfVgroups) {
X
Xiaoyu Wang 已提交
661 662
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
            vgId, numOfVgroups, pTopicName);
L
Liu Jicong 已提交
663
    taosMemoryFree(pParamSet);
664 665
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
666 667
  }

668
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
669
  if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
670
    code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
L
Liu Jicong 已提交
671

672 673 674 675 676
    // failed to commit, callback user function directly.
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(pParamSet);
      pCommitFp(tmq, code, userParam);
    }
X
Xiaoyu Wang 已提交
677
  } else {  // do not perform commit, callback user function directly.
678 679
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, code, userParam);
L
Liu Jicong 已提交
680 681 682
  }
}

683
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
684 685
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
686 687
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
688
  }
689 690 691

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
692
  pParamSet->callbackFn = pCommitFp;
693 694
  pParamSet->userParam = userParam;

695 696 697
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

698
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
699
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
700 701

  for (int32_t i = 0; i < numOfTopics; i++) {
702
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
703
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
704

705 706
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
707
    for (int32_t j = 0; j < numOfVgroups; j++) {
708 709
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

H
Haojun Liao 已提交
710
      if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
711
        int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
712 713
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
714
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
715
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
716 717
          continue;
        }
H
Haojun Liao 已提交
718 719

        // update the offset value.
H
Haojun Liao 已提交
720
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
721
      } else {
D
dapan1121 已提交
722
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
723
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
724 725 726 727
      }
    }
  }

H
Haojun Liao 已提交
728
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
729
           numOfTopics);
H
Haojun Liao 已提交
730

L
Liu Jicong 已提交
731
  // no request is sent
L
Liu Jicong 已提交
732 733
  if (pParamSet->totalRspNum == 0) {
    taosMemoryFree(pParamSet);
734 735
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
736 737
  }

L
Liu Jicong 已提交
738
  // count down since waiting rsp num init as 1
739
  commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
740 741
}

742 743
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
744
  if (tmq != NULL) {
S
Shengliang Guan 已提交
745
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
746
    *pTaskType = type;
747 748
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
wmmhello's avatar
wmmhello 已提交
749
    taosReleaseRef(tmqMgmt.rsetId, refId);
750
  }
751 752 753 754 755
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
756
  taosMemoryFree(param);
L
Liu Jicong 已提交
757 758 759
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
760
  int64_t refId = *(int64_t*)param;
761
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
762
  taosMemoryFree(param);
L
Liu Jicong 已提交
763 764 765
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
766 767 768
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
769
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
770 771 772 773
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
774 775

  taosReleaseRef(tmqMgmt.rsetId, refId);
776
  taosMemoryFree(param);
L
Liu Jicong 已提交
777 778
}

779
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
780 781 782 783
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
784 785 786 787
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
788
  int64_t refId = *(int64_t*)param;
789

X
Xiaoyu Wang 已提交
790
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
791
  if (tmq == NULL) {
L
Liu Jicong 已提交
792
    taosMemoryFree(param);
793 794
    return;
  }
D
dapan1121 已提交
795 796 797 798 799

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;

L
Liu Jicong 已提交
800
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
801 802
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
803
    goto OVER;
D
dapan1121 已提交
804
  }
805

L
Liu Jicong 已提交
806
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
807 808
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
809
    goto OVER;
D
dapan1121 已提交
810
  }
811

D
dapan1121 已提交
812 813 814
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
815
    goto OVER;
D
dapan1121 已提交
816
  }
817 818 819 820

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
821
    goto OVER;
822
  }
823

824
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
825 826 827 828 829

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
830
  sendInfo->msgType = TDMT_MND_TMQ_HB;
831 832 833 834 835 836 837

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
838
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
839
  taosReleaseRef(tmqMgmt.rsetId, refId);
840 841
}

842 843
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
X
Xiaoyu Wang 已提交
844
    tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
845 846 847
  }
}

848
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
849
  STaosQall* qall = taosAllocateQall();
850
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
851

852 853 854 855
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
856

X
Xiaoyu Wang 已提交
857
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
858 859
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
860

861
  while (pTaskType != NULL) {
862
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
863
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
864 865

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
866
      *pRefId = pTmq->refId;
867

X
Xiaoyu Wang 已提交
868
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
869
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
870
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
871
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
872 873

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
874
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
875
      *pRefId = pTmq->refId;
876

877
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
878
               pTmq->autoCommitInterval / 1000.0);
879
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
880 881
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
882

L
Liu Jicong 已提交
883
    taosFreeQitem(pTaskType);
884
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
885
  }
886

L
Liu Jicong 已提交
887 888 889 890
  taosFreeQall(qall);
  return 0;
}

891
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
892 893 894 895 896 897 898
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
899 900
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
901 902 903
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
904
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
905 906
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
907 908
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
909 910 911
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
912 913
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
914 915 916
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
917
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
918 919 920 921
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
922 923

  return NULL;
L
Liu Jicong 已提交
924 925
}

L
Liu Jicong 已提交
926
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
927
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
928
  while (1) {
L
Liu Jicong 已提交
929 930 931 932 933
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
934
      break;
L
Liu Jicong 已提交
935
    }
L
Liu Jicong 已提交
936 937
  }

L
Liu Jicong 已提交
938
  rspWrapper = NULL;
L
Liu Jicong 已提交
939 940
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
941 942 943 944 945
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
946
      break;
L
Liu Jicong 已提交
947
    }
L
Liu Jicong 已提交
948 949 950
  }
}

D
dapan1121 已提交
951
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
952 953
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
954 955

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
956 957 958
  tsem_post(&pParam->rspSem);
  return 0;
}
959

L
Liu Jicong 已提交
960
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
961 962 963 964
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
965
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
966
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
967
  }
L
Liu Jicong 已提交
968
  return 0;
X
Xiaoyu Wang 已提交
969 970
}

L
Liu Jicong 已提交
971
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
972 973
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
974
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
975 976 977 978 979 980 981 982 983 984
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
985 986
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
987 988
}

989 990 991 992 993 994
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

995
void tmqFreeImpl(void* handle) {
996 997
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
998

999
  // TODO stop timer
L
Liu Jicong 已提交
1000 1001 1002 1003
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
1004

H
Haojun Liao 已提交
1005 1006 1007 1008 1009
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1010
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1011

1012
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1013 1014
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1015 1016

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1017 1018
}

1019 1020 1021 1022 1023 1024 1025 1026 1027
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1028
  if (tmqMgmt.rsetId < 0) {
1029 1030 1031 1032
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1033
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1034 1035 1036 1037
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1038 1039
  }

L
Liu Jicong 已提交
1040 1041
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1042
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1043
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1044 1045
    return NULL;
  }
L
Liu Jicong 已提交
1046

L
Liu Jicong 已提交
1047 1048 1049
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1050 1051 1052
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1053
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1054

X
Xiaoyu Wang 已提交
1055 1056
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1057
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1058
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1059
    goto _failed;
L
Liu Jicong 已提交
1060
  }
L
Liu Jicong 已提交
1061

L
Liu Jicong 已提交
1062 1063
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1064 1065
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1066

L
Liu Jicong 已提交
1067 1068 1069
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1070
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1071
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1072
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1073
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1074 1075
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1076
  pTmq->resetOffsetCfg = conf->resetOffset;
wmmhello's avatar
wmmhello 已提交
1077
  taosInitRWLatch(&pTmq->lock);
L
Liu Jicong 已提交
1078

1079 1080
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1081
  // assign consumerId
L
Liu Jicong 已提交
1082
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1083

L
Liu Jicong 已提交
1084 1085
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1086
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1087
             pTmq->groupId);
1088
    goto _failed;
L
Liu Jicong 已提交
1089
  }
L
Liu Jicong 已提交
1090

L
Liu Jicong 已提交
1091 1092 1093
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1094
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1095
    tsem_destroy(&pTmq->rspSem);
1096
    goto _failed;
L
Liu Jicong 已提交
1097
  }
L
Liu Jicong 已提交
1098

1099 1100
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1101
    goto _failed;
1102 1103
  }

1104
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1105 1106
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1107
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1108 1109
  }

1110 1111 1112
  char         buf[80] = {0};
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1113 1114 1115 1116
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1117

1118
  return pTmq;
1119

1120 1121
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1122
  return NULL;
1123 1124
}

L
Liu Jicong 已提交
1125
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1126
  const int32_t   MAX_RETRY_COUNT = 120 * 60;  // let's wait for 2 mins at most
L
Liu Jicong 已提交
1127 1128 1129
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1130
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1131
  SCMSubscribeReq req = {0};
1132
  int32_t         code = 0;
1133

1134
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1135

1136
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1137
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1138
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1139 1140
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1141 1142 1143 1144
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1145

L
Liu Jicong 已提交
1146 1147
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1148 1149

    SName name = {0};
L
Liu Jicong 已提交
1150 1151 1152 1153
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1154 1155
    }

1156
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1157
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1158 1159

    taosArrayPush(req.topicNames, &topicFName);
1160 1161
  }

L
Liu Jicong 已提交
1162
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1163

L
Liu Jicong 已提交
1164
  buf = taosMemoryMalloc(tlen);
1165 1166 1167 1168
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1169

1170 1171 1172
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1173
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1174 1175 1176 1177
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1178

H
Haojun Liao 已提交
1179
  SMqSubscribeCbParam param = { .rspErr = 0};
1180
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1181
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1182 1183
    goto FAIL;
  }
L
Liu Jicong 已提交
1184

1185
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1186

L
Liu Jicong 已提交
1187 1188
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1189 1190
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1191
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1192

1193 1194 1195 1196 1197
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1198 1199
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1200
  sendInfo = NULL;
L
Liu Jicong 已提交
1201

L
Liu Jicong 已提交
1202 1203
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1204

1205 1206 1207 1208
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1209

L
Liu Jicong 已提交
1210
  int32_t retryCnt = 0;
1211
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1212
    if (retryCnt++ > MAX_RETRY_COUNT) {
wmmhello's avatar
wmmhello 已提交
1213
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
1214
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
1215 1216
      goto FAIL;
    }
1217

X
Xiaoyu Wang 已提交
1218
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1219 1220
    taosMsleep(500);
  }
1221

1222 1223
  // init ep timer
  if (tmq->epTimer == NULL) {
1224 1225 1226
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1227
  }
L
Liu Jicong 已提交
1228 1229

  // init auto commit timer
1230
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1231 1232 1233
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1234 1235
  }

L
Liu Jicong 已提交
1236
FAIL:
L
Liu Jicong 已提交
1237
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1238
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1239
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1240

L
Liu Jicong 已提交
1241
  return code;
1242 1243
}

L
Liu Jicong 已提交
1244
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1245
  conf->commitCb = cb;
L
Liu Jicong 已提交
1246
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1247
}
1248

wmmhello's avatar
wmmhello 已提交
1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t  vgId){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        if(pVgCur->vgId == vgId){
          return pVgCur;
        }
      }
    }
  }
  return NULL;
}

static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      return pTopicCur;
    }
  }
  return NULL;
}

D
dapan1121 已提交
1277
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1278
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1279 1280

  int64_t         refId = pParam->refId;
wmmhello's avatar
wmmhello 已提交
1281 1282
//  SMqClientVg*    pVg = pParam->pVg;
//  SMqClientTopic* pTopic = pParam->pTopic;
1283

1284
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1285 1286 1287
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1288
    taosMemoryFree(pMsg->pEpSet);
1289 1290 1291 1292
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1293 1294 1295 1296
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1297
  if (code != 0) {
L
Liu Jicong 已提交
1298
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1299 1300
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1301
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1302
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1303
//      taosMsleep(500);
L
Liu Jicong 已提交
1304
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1305 1306
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1307
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1308
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1309
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1310 1311
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1312 1313
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1314

L
Liu Jicong 已提交
1315 1316
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
1317 1318
//    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
//      taosMsleep(5);
wmmhello's avatar
wmmhello 已提交
1319 1320 1321
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1322
    }
H
Haojun Liao 已提交
1323

L
fix txn  
Liu Jicong 已提交
1324
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1325 1326
  }

X
Xiaoyu Wang 已提交
1327
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1328 1329
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1330
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1331 1332
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1333
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1334

1335
    tsem_post(&tmq->rspSem);
1336 1337
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1338
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1339
    taosMemoryFree(pMsg->pEpSet);
wmmhello's avatar
wmmhello 已提交
1340 1341
    taosMemoryFree(pParam);

X
Xiaoyu Wang 已提交
1342 1343 1344
    return 0;
  }

1345
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1346
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1347
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1348 1349
  }

L
Liu Jicong 已提交
1350 1351 1352
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1353
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1354
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1355
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1356
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1357 1358
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1359
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1360
  }
L
Liu Jicong 已提交
1361

L
Liu Jicong 已提交
1362
  pRspWrapper->tmqRspType = rspType;
wmmhello's avatar
wmmhello 已提交
1363 1364
//  pRspWrapper->vgHandle = pVg;
//  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1365
  pRspWrapper->reqId = requestId;
1366
  pRspWrapper->pEpset = pMsg->pEpSet;
wmmhello's avatar
wmmhello 已提交
1367 1368
  pRspWrapper->vgId = vgId;
  strcpy(pRspWrapper->topicName, pParam->topicName);
L
Liu Jicong 已提交
1369

1370
  pMsg->pEpSet = NULL;
L
Liu Jicong 已提交
1371
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1372 1373
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1374
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1375
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1376
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1377

H
Haojun Liao 已提交
1378 1379
    char buf[80];
    tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1380
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1381
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1382
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1383 1384
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1385
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1386
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1387
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1388 1389 1390 1391 1392 1393
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1394 1395
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1396
  }
L
Liu Jicong 已提交
1397

L
Liu Jicong 已提交
1398
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1399
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1400

1401
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1402
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1403
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1404

1405
  tsem_post(&tmq->rspSem);
1406
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1407
  taosMemoryFree(pParam);
1408

L
Liu Jicong 已提交
1409
  return 0;
H
Haojun Liao 已提交
1410

L
fix txn  
Liu Jicong 已提交
1411
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1412
  if (epoch == tmq->epoch) {
wmmhello's avatar
wmmhello 已提交
1413 1414 1415 1416 1417
    taosWLockLatch(&tmq->lock);
    SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
    if(pVg){
      atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
    }
wmmhello's avatar
wmmhello 已提交
1418
    taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
1419
  }
H
Haojun Liao 已提交
1420

1421
  tsem_post(&tmq->rspSem);
1422
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1423
  taosMemoryFree(pParam);
1424

L
Liu Jicong 已提交
1425
  return -1;
1426 1427
}

H
Haojun Liao 已提交
1428 1429 1430 1431 1432
typedef struct SVgroupSaveInfo {
  STqOffsetVal offset;
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1433 1434 1435 1436 1437 1438
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1439
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1440 1441 1442 1443 1444
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

wmmhello's avatar
wmmhello 已提交
1445
  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1446 1447 1448 1449
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1450 1451

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1452
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1453

X
Xiaoyu Wang 已提交
1454 1455
    int64_t      numOfRows = 0;
    STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
H
Haojun Liao 已提交
1456 1457 1458
    if (pInfo != NULL) {
      offsetNew = pInfo->offset;
      numOfRows = pInfo->numOfRows;
H
Haojun Liao 已提交
1459 1460 1461 1462 1463 1464
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
wmmhello's avatar
wmmhello 已提交
1465
        .vgStatus = TMQ_VG_STATUS__IDLE,
H
Haojun Liao 已提交
1466
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1467
        .emptyBlockReceiveTs = 0,
H
Haojun Liao 已提交
1468
        .numOfRows = numOfRows,
H
Haojun Liao 已提交
1469 1470
    };

H
Haojun Liao 已提交
1471 1472 1473 1474
    clientVg.offsetInfo.currentOffset = offsetNew;
    clientVg.offsetInfo.committedOffset = offsetNew;
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
1475 1476 1477
    clientVg.seekUpdated = false;
    clientVg.receivedInfoFromVnode = false;

H
Haojun Liao 已提交
1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488 1489 1490
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1491
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1492 1493
  bool set = false;

1494
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1495
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1496

X
Xiaoyu Wang 已提交
1497 1498
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1499
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
wmmhello's avatar
wmmhello 已提交
1500 1501 1502
  if (epoch <= tmq->epoch) {
    return false;
  }
1503 1504 1505 1506 1507 1508

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1509 1510
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1511 1512 1513
    taosArrayDestroy(newTopics);
    return false;
  }
1514

H
Haojun Liao 已提交
1515
  // todo extract method
1516 1517 1518 1519 1520
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
wmmhello's avatar
wmmhello 已提交
1521
      tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1522 1523
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1524 1525
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

L
Liu Jicong 已提交
1526
        char buf[80];
H
Haojun Liao 已提交
1527
        tFormatOffset(buf, 80, &pVgCur->offsetInfo.currentOffset);
X
Xiaoyu Wang 已提交
1528 1529
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
                 vgKey, buf);
H
Haojun Liao 已提交
1530

H
Haojun Liao 已提交
1531
        SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1532
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1533 1534 1535 1536 1537 1538 1539
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1540
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1541 1542
    taosArrayPush(newTopics, &topic);
  }
1543

H
Haojun Liao 已提交
1544 1545
  taosHashCleanup(pVgOffsetHashMap);

wmmhello's avatar
wmmhello 已提交
1546
  taosWLockLatch(&tmq->lock);
1547
  // destroy current buffered existed topics info
1548
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1549
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1550
  }
H
Haojun Liao 已提交
1551
  tmq->clientTopics = newTopics;
wmmhello's avatar
wmmhello 已提交
1552
  taosWUnLockLatch(&tmq->lock);
1553

X
Xiaoyu Wang 已提交
1554
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1555
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1556
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1557

1558
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1559 1560 1561
  return set;
}

1562
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1563
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1564 1565 1566
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1567
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
wmmhello's avatar
wmmhello 已提交
1568
//    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
1569

1570
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1571
    taosMemoryFree(pMsg->pEpSet);
1572 1573
    taosMemoryFree(pParam);
    return terrno;
1574 1575
  }

H
Haojun Liao 已提交
1576
  if (code != TSDB_CODE_SUCCESS) {
1577 1578 1579 1580 1581 1582 1583 1584 1585
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1586
  }
L
Liu Jicong 已提交
1587

L
Liu Jicong 已提交
1588
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1589
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1590
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1591 1592 1593
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1594 1595
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
1596

1597 1598 1599 1600 1601 1602 1603 1604
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1605
  } else {
1606 1607
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
             head->epoch, epoch);
1608
  }
L
Liu Jicong 已提交
1609

1610
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1611 1612
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1613
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1614
  taosMemoryFree(pMsg->pData);
1615
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1616
  return code;
1617 1618
}

L
Liu Jicong 已提交
1619
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1620 1621 1622 1623
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1624

1625
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1626
  pReq->consumerId = tmq->consumerId;
1627
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1628
  pReq->epoch = tmq->epoch;
H
Haojun Liao 已提交
1629
  pReq->reqOffset = pVg->offsetInfo.currentOffset;
D
dapan1121 已提交
1630
  pReq->head.vgId = pVg->vgId;
1631 1632
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1633 1634
}

L
Liu Jicong 已提交
1635 1636
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1637
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1638 1639 1640 1641 1642 1643 1644 1645
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1646
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1647 1648
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1649

1650
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1651 1652
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1653

L
Liu Jicong 已提交
1654
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1655
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1656
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1657

L
Liu Jicong 已提交
1658 1659
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1660

L
Liu Jicong 已提交
1661
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1662 1663
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1664

1665
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1666
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1667
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1668
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1669
    pVg->numOfRows += rows;
1670
    (*numOfRows) += rows;
1671 1672
  }

L
Liu Jicong 已提交
1673
  return pRspObj;
X
Xiaoyu Wang 已提交
1674 1675
}

L
Liu Jicong 已提交
1676 1677
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1678
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1679 1680 1681 1682
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1683
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1684 1685 1686

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1687
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1688 1689 1690 1691 1692 1693
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

  return pRspObj;
}

1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
wmmhello's avatar
wmmhello 已提交
1727 1728 1729
//  pParam->pVg = pVg;  // pVg may be released,fix it
//  pParam->pTopic = pTopic;
  strcpy(pParam->topicName, pTopic->topicName);
1730
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1731
  pParam->requestId = req.reqId;
1732 1733 1734 1735 1736 1737 1738 1739

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1740
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1741 1742 1743 1744 1745 1746 1747 1748
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
  char    offsetFormatBuf[80];
H
Haojun Liao 已提交
1749
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
1750

X
Xiaoyu Wang 已提交
1751 1752
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1753 1754 1755
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1756
  pVg->seekUpdated = false;   // reset this flag.
1757 1758 1759 1760 1761
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1762
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1763
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1764 1765 1766
  if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
    return 0;
  }
1767
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1768
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1769 1770

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1771
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1772
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1773 1774

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1775
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
X
Xiaoyu Wang 已提交
1776
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 100ms
1777
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
X
Xiaoyu Wang 已提交
1778
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1779 1780 1781
        continue;
      }

1782
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1783
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1784
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1785
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1786
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1787 1788
        continue;
      }
1789

L
Liu Jicong 已提交
1790
      atomic_store_32(&pVg->vgSkipCnt, 0);
1791 1792 1793
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1794
      }
X
Xiaoyu Wang 已提交
1795 1796
    }
  }
1797

1798
  tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
X
Xiaoyu Wang 已提交
1799 1800 1801
  return 0;
}

H
Haojun Liao 已提交
1802
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1803
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1804
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1805 1806
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1807
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1808
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1809
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1810
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1811 1812
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1813
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1814 1815 1816 1817 1818 1819 1820 1821
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1822
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1823
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1824

X
Xiaoyu Wang 已提交
1825
  while (1) {
1826 1827
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1828

1829
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1830
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1831 1832
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1833 1834
        return NULL;
      }
X
Xiaoyu Wang 已提交
1835 1836
    }

X
Xiaoyu Wang 已提交
1837
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1838

1839 1840
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1841
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1842
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1843
      return NULL;
1844 1845
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1846

X
Xiaoyu Wang 已提交
1847
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1848 1849 1850
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1851 1852 1853 1854 1855 1856 1857 1858
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
1859 1860 1861 1862
        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1863 1864
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1865 1866 1867
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1868 1869 1870
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
T
t_max 已提交
1871
          tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
1872
          pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
T
t_max 已提交
1873 1874
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
wmmhello's avatar
wmmhello 已提交
1875
        }
1876 1877

        // update the status
X
Xiaoyu Wang 已提交
1878
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1879

1880 1881 1882
        // update the valid wal version range
        pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
        pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
1883
        pVg->receivedInfoFromVnode = true;
1884

1885 1886 1887
        char buf[80];
        tFormatOffset(buf, 80, &pDataRsp->rspOffset);
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1888 1889 1890
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
                   " total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1891
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1892
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1893
          taosFreeQitem(pollRspWrapper);
1894
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1895
          int64_t    numOfRows = 0;
1896
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1897
          tmq->totalRows += numOfRows;
1898
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1899
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
H
Haojun Liao 已提交
1900
                   " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1901
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1902
                   pollRspWrapper->reqId);
1903 1904 1905
          taosFreeQitem(pollRspWrapper);
          return pRsp;
        }
X
Xiaoyu Wang 已提交
1906
      } else {
H
Haojun Liao 已提交
1907
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1908
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1909
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1910 1911
        taosFreeQitem(pollRspWrapper);
      }
1912
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1913
      // todo handle the wal range and epset for each vgroup
1914
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1915
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1916 1917 1918

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1919
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1920 1921 1922 1923 1924 1925 1926 1927
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1928

1929
        if(pollRspWrapper->metaRsp.rspOffset.type != 0){    // if offset is validate
H
Haojun Liao 已提交
1930
          pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
1931
        }
1932

L
Liu Jicong 已提交
1933 1934
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1935
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1936 1937 1938
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
H
Haojun Liao 已提交
1939
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1940
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1941
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1942
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1943
      }
1944 1945
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1946
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1947

L
Liu Jicong 已提交
1948
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1949 1950 1951 1952 1953 1954 1955 1956
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1957

T
t_max 已提交
1958 1959 1960 1961 1962 1963 1964 1965 1966
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
          if(pollRspWrapper->taosxRsp.rspOffset.type != 0) {    // if offset is validate
            tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
            pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
          }
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
1967
        }
1968

L
Liu Jicong 已提交
1969
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1970

L
Liu Jicong 已提交
1971
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
H
Haojun Liao 已提交
1972 1973
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
1974
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
1975
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
1976
          taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1977
          continue;
H
Haojun Liao 已提交
1978
        } else {
X
Xiaoyu Wang 已提交
1979
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
L
Liu Jicong 已提交
1980
        }
wmmhello's avatar
wmmhello 已提交
1981

L
Liu Jicong 已提交
1982
        // build rsp
X
Xiaoyu Wang 已提交
1983
        void*   pRsp = NULL;
1984
        int64_t numOfRows = 0;
L
Liu Jicong 已提交
1985
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
1986
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
L
Liu Jicong 已提交
1987
        } else {
wmmhello's avatar
wmmhello 已提交
1988 1989
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper);
        }
H
Haojun Liao 已提交
1990

1991 1992
        tmq->totalRows += numOfRows;

H
Haojun Liao 已提交
1993
        char buf[80];
H
Haojun Liao 已提交
1994
        tFormatOffset(buf, 80, &pVg->offsetInfo.currentOffset);
H
Haojun Liao 已提交
1995
        tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
X
Xiaoyu Wang 已提交
1996
                 ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1997
                 tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
H
Haojun Liao 已提交
1998
                 tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
1999 2000

        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2001
        return pRsp;
H
Haojun Liao 已提交
2002

L
Liu Jicong 已提交
2003
      } else {
H
Haojun Liao 已提交
2004
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
2005
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
2006
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
2007 2008
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
2009
    } else {
H
Haojun Liao 已提交
2010 2011
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
2012
      bool reset = false;
2013 2014
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
2015
      if (pollIfReset && reset) {
2016
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
2017
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
2018 2019 2020 2021 2022
      }
    }
  }
}

2023
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
2024 2025
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
2026

X
Xiaoyu Wang 已提交
2027 2028
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
           timeout);
L
Liu Jicong 已提交
2029

2030
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
2031
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
2032
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
2033
    taosMsleep(500);  //     sleep for a while
2034 2035 2036
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
2037
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
2038
    int32_t retryCnt = 0;
2039
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
2040
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
2041 2042
        return NULL;
      }
2043

H
Haojun Liao 已提交
2044
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
2045 2046 2047 2048
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
2049
  while (1) {
L
Liu Jicong 已提交
2050
    tmqHandleAllDelayedTask(tmq);
2051

L
Liu Jicong 已提交
2052
    if (tmqPollImpl(tmq, timeout) < 0) {
2053
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
2054
    }
L
Liu Jicong 已提交
2055

2056
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
2057
    if (rspObj) {
2058
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
2059
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
2060
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
2061
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
2062
      return NULL;
X
Xiaoyu Wang 已提交
2063
    }
2064

2065
    if (timeout >= 0) {
L
Liu Jicong 已提交
2066
      int64_t currentTime = taosGetTimestampMs();
2067 2068 2069
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
2070
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
2071 2072
        return NULL;
      }
2073
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
2074 2075
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
2076
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
2077 2078 2079 2080
    }
  }
}

2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094
static void displayConsumeStatistics(const tmq_t* pTmq) {
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2095
    }
2096
  }
2097

2098 2099
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2100

2101
int32_t tmq_consumer_close(tmq_t* tmq) {
X
Xiaoyu Wang 已提交
2102
  tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2103
  displayConsumeStatistics(tmq);
2104

2105 2106 2107 2108 2109 2110
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2111 2112 2113
      }
    }

L
Liu Jicong 已提交
2114
    int32_t     retryCnt = 0;
2115
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2116
    while (1) {
2117
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2118 2119 2120 2121 2122 2123 2124 2125
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2126
    tmq_list_destroy(lst);
2127
  } else {
X
Xiaoyu Wang 已提交
2128
    tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2129
  }
H
Haojun Liao 已提交
2130

2131
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2132
  return 0;
2133
}
L
Liu Jicong 已提交
2134

L
Liu Jicong 已提交
2135 2136
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2137
    return "success";
L
Liu Jicong 已提交
2138
  } else if (err == -1) {
L
Liu Jicong 已提交
2139 2140 2141
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2142 2143
  }
}
L
Liu Jicong 已提交
2144

L
Liu Jicong 已提交
2145 2146 2147 2148 2149
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2150 2151
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2152 2153 2154 2155 2156
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2157
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2158 2159
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2160
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2161 2162 2163
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2164 2165 2166
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2167 2168 2169 2170 2171
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2172 2173 2174 2175
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2176 2177 2178
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2179 2180 2181
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2182 2183 2184 2185 2186
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2187 2188 2189 2190
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2191 2192 2193
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2194
  } else if (TD_RES_TMQ_METADATA(res)) {
2195 2196
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2197 2198 2199 2200
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2201

2202 2203 2204 2205 2206 2207 2208 2209 2210 2211 2212 2213 2214 2215 2216 2217 2218 2219 2220 2221 2222 2223 2224
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
    STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
    if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  }

  // data from tsdb, no valid offset info
  return -1;
}

L
Liu Jicong 已提交
2225 2226 2227 2228 2229 2230 2231
const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2232
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2233 2234
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2235 2236 2237
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2238
    }
L
Liu Jicong 已提交
2239 2240
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2241 2242
  return NULL;
}
2243

2244 2245 2246 2247
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2248
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
2249
  }
L
Liu Jicong 已提交
2250 2251
}

2252
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2253 2254 2255
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2256
}
2257

2258 2259 2260 2261 2262 2263 2264 2265 2266
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2267
  } else {
2268
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
2269 2270
  }

2271 2272
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2273 2274

  tsem_destroy(&pInfo->sem);
2275 2276
  taosMemoryFree(pInfo);

X
Xiaoyu Wang 已提交
2277
  tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289
  return code;
}

void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2290
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2291 2292 2293
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2294
  tsem_post(&pInfo->sem);
2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2321
  tsem_init(&pInfo->sem, 0, 0);
2322 2323

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2324
  tsem_wait(&pInfo->sem);
2325 2326

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2327
  tsem_destroy(&pInfo->sem);
2328 2329 2330 2331 2332
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2333
  SMqAskEpReq req = {0};
2334 2335 2336
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2337 2338 2339

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2340 2341 2342
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2343 2344 2345 2346
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2347 2348 2349
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2350 2351 2352
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2353
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2354
    taosMemoryFree(pReq);
2355 2356 2357

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2358 2359 2360 2361
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2362
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2363
    taosMemoryFree(pReq);
2364 2365 2366

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2367 2368
  }

2369 2370 2371 2372
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2373 2374 2375 2376 2377

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2378 2379
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2380 2381
  }

X
Xiaoyu Wang 已提交
2382
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2383 2384 2385 2386

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2387
  sendInfo->fp = askEpCallbackFn;
2388 2389
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2390 2391
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2392 2393

  int64_t transporterId = 0;
2394
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2395 2396 2397 2398 2399 2400 2401
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2402 2403 2404
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2405 2406 2407 2408 2409 2410 2411
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2412
  pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
2413
  taosMemoryFree(pParamSet);
2414 2415

  taosReleaseRef(tmqMgmt.rsetId, refId);
2416
  return 0;
2417 2418
}

2419
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2420 2421
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
H
Haojun Liao 已提交
2422 2423
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
             vgId);
2424
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2425 2426 2427
  } else {
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
             waitingRspNum);
2428 2429
  }
}
2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2452 2453
}

2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
                                       .end = pHead->walever,
                                       .currentOffset = rsp.rspOffset.version,
2475
                                       .vgId = pParam->vgId};
2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

H
Haojun Liao 已提交
2498
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2499
                                 int32_t* numOfAssignment) {
H
Haojun Liao 已提交
2500 2501 2502
  *numOfAssignment = 0;
  *assignment = NULL;

2503
  int32_t accId = tmq->pTscObj->acctId;
2504
  char    tname[128] = {0};
2505 2506 2507
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2508 2509 2510 2511 2512 2513
  if (pTopic == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2514 2515 2516 2517 2518 2519 2520 2521

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
    return TSDB_CODE_OUT_OF_MEMORY;
  }

2522 2523
  bool needFetch = false;

H
Haojun Liao 已提交
2524 2525
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2526
    if (!pClientVg->receivedInfoFromVnode) {
2527 2528 2529
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2530 2531 2532 2533 2534 2535 2536 2537 2538 2539

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
    if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
      pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
    } else {
      pAssignment->currentOffset = 0;
    }

    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2540
    pAssignment->vgId = pClientVg->vgId;
H
Haojun Liao 已提交
2541 2542
  }

2543 2544 2545 2546 2547 2548 2549 2550 2551 2552 2553 2554 2555 2556 2557 2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624
  if (needFetch) {
    SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
        destroyCommonInfo(pCommon);
        return terrno;
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
      char    offsetFormatBuf[80];
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);

      tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
               tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
    int32_t code = pCommon->code;

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(*assignment);
2625
      *assignment = NULL;
2626 2627 2628 2629 2630 2631 2632 2633 2634
      *numOfAssignment = 0;
    } else {
      int32_t num = taosArrayGetSize(pCommon->pList);
      for(int32_t i = 0; i < num; ++i) {
        (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
      }
      *numOfAssignment = num;
    }

2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;

        pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;

        char offsetBuf[80] = {0};
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

        tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
        pOffsetInfo->currentOffset.version = p->currentOffset;
        pOffsetInfo->committedOffset.version = p->currentOffset;
      }
    }

2660 2661 2662 2663 2664
    destroyCommonInfo(pCommon);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2665 2666
}

T
t_max 已提交
2667 2668 2669 2670 2671 2672 2673 2674
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
    if (pAssignment == NULL) {
        return;
    }

    taosMemoryFree(pAssignment);
}

2675
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
H
Haojun Liao 已提交
2676
  if (tmq == NULL) {
H
Haojun Liao 已提交
2677
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
2678 2679 2680
    return TSDB_CODE_INVALID_PARA;
  }

2681 2682 2683 2684 2685
  int32_t accId = tmq->pTscObj->acctId;
  char tname[128] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2686
  if (pTopic == NULL) {
2687
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
H
Haojun Liao 已提交
2688 2689 2690 2691
    return TSDB_CODE_INVALID_PARA;
  }

  SMqClientVg* pVg = NULL;
H
Haojun Liao 已提交
2692 2693
  int32_t      numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
H
Haojun Liao 已提交
2694
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
2695
    if (pClientVg->vgId == vgId) {
H
Haojun Liao 已提交
2696 2697 2698 2699 2700 2701
      pVg = pClientVg;
      break;
    }
  }

  if (pVg == NULL) {
2702
    tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
H
Haojun Liao 已提交
2703 2704 2705 2706 2707 2708
    return TSDB_CODE_INVALID_PARA;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

  int32_t type = pOffsetInfo->currentOffset.type;
2709
  if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
2710
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
H
Haojun Liao 已提交
2711 2712 2713
    return TSDB_CODE_INVALID_PARA;
  }

2714
  if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
H
Haojun Liao 已提交
2715 2716
    tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
             tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
H
Haojun Liao 已提交
2717 2718 2719
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2720 2721 2722
  // update the offset, and then commit to vnode
  if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
    pOffsetInfo->currentOffset.version = offset;
2723
    pOffsetInfo->committedOffset.version = INT64_MIN;
2724
    pVg->seekUpdated = true;
H
Haojun Liao 已提交
2725 2726
  }

H
Haojun Liao 已提交
2727
  SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
2728
  tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
H
Haojun Liao 已提交
2729

H
Haojun Liao 已提交
2730
  tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);

  tsem_wait(&pInfo->sem);
  int32_t code = pInfo->code;

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

H
Haojun Liao 已提交
2749 2750 2751 2752 2753 2754
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
             tstrerror(code));
  }

  return code;
2755
}