clientTmq.c 91.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
#define OFFSET_IS_RESET_OFFSET(_of)  ((_of) < 0)

32 33
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
34
struct SMqMgmt {
35 36 37
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
38
};
L
Liu Jicong 已提交
39

X
Xiaoyu Wang 已提交
40 41
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
42
static struct SMqMgmt tmqMgmt = {0};
43

L
Liu Jicong 已提交
44 45 46 47 48 49
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
50 51 52
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
53 54
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
55
struct tmq_list_t {
L
Liu Jicong 已提交
56
  SArray container;
L
Liu Jicong 已提交
57
};
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59
struct tmq_conf_t {
60 61 62 63 64 65 66 67
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
68 69 70 71 72
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
73
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
74
  void*          commitCbUserParam;
L
Liu Jicong 已提交
75 76 77
};

struct tmq_t {
78 79 80 81 82 83 84
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
85
  int8_t         resetOffsetCfg;
86 87
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
88 89
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
90 91

  // status
wmmhello's avatar
wmmhello 已提交
92
  SRWLatch        lock;
L
Liu Jicong 已提交
93 94
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
95 96
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
97
  int32_t epSkipCnt;
L
Liu Jicong 已提交
98
#endif
99
  // poll info
X
Xiaoyu Wang 已提交
100 101
  int64_t pollCnt;
  int64_t totalRows;
L
Liu Jicong 已提交
102

L
Liu Jicong 已提交
103
  // timer
X
Xiaoyu Wang 已提交
104 105 106 107 108 109 110 111 112 113
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
114 115
};

116 117
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
118
  tsem_t  sem;
119 120
} SAskEpInfo;

X
Xiaoyu Wang 已提交
121 122 123 124 125 126 127 128
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
129
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
130
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
131 132
};

L
Liu Jicong 已提交
133
enum {
134
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
135 136 137 138
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
139
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
140 141
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
H
Haojun Liao 已提交
142 143 144 145 146 147 148 149 150 151
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
H
Haojun Liao 已提交
152 153 154 155
  int32_t       vgSkipCnt;              // here used to mark the slow vgroups
  bool          receivedInfoFromVnode;  // has already received info from vnode
  int64_t       emptyBlockReceiveTs;    // once empty block is received, idle for ignoreCnt then start to poll data
  bool          seekUpdated;            // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
156
  SEpSet        epSet;
157 158
} SMqClientVg;

L
Liu Jicong 已提交
159
typedef struct {
160 161 162
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
163
  SSchemaWrapper schema;
164 165
} SMqClientTopic;

L
Liu Jicong 已提交
166 167
typedef struct {
  int8_t          tmqRspType;
168
  int32_t         epoch;  // epoch can be used to guard the vgHandle
169
  int32_t         vgId;
wmmhello's avatar
wmmhello 已提交
170
  char            topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
171 172
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
173
  uint64_t        reqId;
174
  SEpSet*         pEpset;
L
Liu Jicong 已提交
175
  union {
L
Liu Jicong 已提交
176 177
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
178
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
179
  };
L
Liu Jicong 已提交
180 181
} SMqPollRspWrapper;

L
Liu Jicong 已提交
182
typedef struct {
wmmhello's avatar
wmmhello 已提交
183 184
//  int64_t refId;
//  int32_t epoch;
L
Liu Jicong 已提交
185 186
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
187
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
188

L
Liu Jicong 已提交
189
typedef struct {
190 191 192 193
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
194 195
} SMqAskEpCbParam;

L
Liu Jicong 已提交
196
typedef struct {
197 198
  int64_t         refId;
  int32_t         epoch;
wmmhello's avatar
wmmhello 已提交
199 200 201
  char            topicName[TSDB_TOPIC_FNAME_LEN];
//  SMqClientVg*    pVg;
//  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
202
  int32_t         vgId;
X
Xiaoyu Wang 已提交
203
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
204
} SMqPollCbParam;
205

206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

223
typedef struct {
224 225
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
226 227
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
228
  int32_t        code;
229
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
230 231
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
232
  void* userParam;
233 234 235 236
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
237
  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
238 239 240
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
241
} SMqCommitCbParam;
242

243 244 245 246 247
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

248
static int32_t doAskEp(tmq_t* tmq);
249 250
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
251
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
252
                               int32_t index, int32_t totalVgroups, int32_t type);
X
Xiaoyu Wang 已提交
253 254 255
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
256

257
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
258
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
259 260 261 262 263
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

264
  conf->withTbName = false;
L
Liu Jicong 已提交
265
  conf->autoCommit = true;
266
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
267
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
268
  conf->hbBgEnable = true;
269

270 271 272
  return conf;
}

L
Liu Jicong 已提交
273
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
274
  if (conf) {
275 276 277 278 279 280 281 282 283
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
284 285
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
286 287 288
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
289
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
290
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
291
    return TMQ_CONF_OK;
292
  }
L
Liu Jicong 已提交
293

294
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
295
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
296 297
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
298

299 300
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
301
      conf->autoCommit = true;
L
Liu Jicong 已提交
302
      return TMQ_CONF_OK;
303
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
304
      conf->autoCommit = false;
L
Liu Jicong 已提交
305 306 307 308
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
309
  }
L
Liu Jicong 已提交
310

311
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
312
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
313 314 315
    return TMQ_CONF_OK;
  }

316 317 318
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
319
      return TMQ_CONF_OK;
320 321
    } else if (strcasecmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEAST;
L
Liu Jicong 已提交
322
      return TMQ_CONF_OK;
323 324
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
325 326 327 328 329
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
330

331 332
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
333
      conf->withTbName = true;
L
Liu Jicong 已提交
334
      return TMQ_CONF_OK;
335
    } else if (strcasecmp(value, "false") == 0) {
336
      conf->withTbName = false;
L
Liu Jicong 已提交
337
      return TMQ_CONF_OK;
338 339 340 341 342
    } else {
      return TMQ_CONF_INVALID;
    }
  }

343 344
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
345
      conf->snapEnable = true;
346
      return TMQ_CONF_OK;
347
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
348
      conf->snapEnable = false;
349 350 351 352 353 354
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

355
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
356
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
357 358 359
    return TMQ_CONF_OK;
  }

360
  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
361 362 363 364 365 366 367 368 369 370
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
    tscError("the default value of enable.heartbeat.background is true, can not be seted");
    return TMQ_CONF_INVALID;
    //    }
L
Liu Jicong 已提交
371 372
  }

373
  if (strcasecmp(key, "td.connect.ip") == 0) {
374
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
375 376
    return TMQ_CONF_OK;
  }
377

378
  if (strcasecmp(key, "td.connect.user") == 0) {
379
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
380 381
    return TMQ_CONF_OK;
  }
382

383
  if (strcasecmp(key, "td.connect.pass") == 0) {
384
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
385 386
    return TMQ_CONF_OK;
  }
387

388
  if (strcasecmp(key, "td.connect.port") == 0) {
389
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
390 391
    return TMQ_CONF_OK;
  }
392

393
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
394 395 396
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
397
  return TMQ_CONF_UNKNOWN;
398 399
}

X
Xiaoyu Wang 已提交
400
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
401

L
Liu Jicong 已提交
402 403
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
404
  if (src == NULL || src[0] == 0) return -1;
405
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
406
  if (taosArrayPush(container, &topic) == NULL) return -1;
407 408 409
  return 0;
}

L
Liu Jicong 已提交
410
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
411
  SArray* container = &list->container;
L
Liu Jicong 已提交
412
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
413 414
}

L
Liu Jicong 已提交
415 416 417 418 419 420 421 422 423 424
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

X
Xiaoyu Wang 已提交
425 426
static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
                                  int32_t* numOfVgroups) {
427 428 429 430
  int32_t numOfTopics = taosArrayGetSize(pTopicList);
  *index = -1;
  *numOfVgroups = 0;

X
Xiaoyu Wang 已提交
431
  for (int32_t i = 0; i < numOfTopics; ++i) {
432 433
    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
    if (strcmp(pTopic->topicName, pName) != 0) {
434 435 436
      continue;
    }

437 438
    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
439
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
440 441 442
      if (pClientVg->vgId == vgId) {
        *index = j;
        return pClientVg;
443 444
      }
    }
L
Liu Jicong 已提交
445
  }
446 447

  return NULL;
L
Liu Jicong 已提交
448
}
449

450 451 452
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
H
Haojun Liao 已提交
453
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
454
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
455
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
456

X
Xiaoyu Wang 已提交
457 458 459 460 461 462 463 464 465 466 467 468 469 470
  //  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
  //    taosThreadMutexLock(&pParam->pTmq->lock);
  //    int32_t numOfVgroups, index;
  //    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
  //    &numOfVgroups); if (pVg == NULL) {
  //      tscDebug("consumer:0x%" PRIx64
  //               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
  //               ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
  //               index + 1, numOfVgroups);
  //    } else { // let's retry the commit
  //      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
  //      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
  //        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
  //                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
471
  //                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
X
Xiaoyu Wang 已提交
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
  //                 tstrerror(terrno), index + 1, numOfVgroups);
  //      }
  //    }
  //
  //    taosThreadMutexUnlock(&pParam->pTmq->lock);
  //
  //    taosMemoryFree(pParam->pOffset);
  //    taosMemoryFree(pBuf->pData);
  //    taosMemoryFree(pBuf->pEpSet);
  //
  //    commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
  //    return 0;
  //  }
  //
  //  // todo replace the pTmq with refId
487

L
Liu Jicong 已提交
488
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
489
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
490
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
491

492
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
493 494 495
  return 0;
}

496
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
497
                               int32_t index, int32_t totalVgroups, int32_t type) {
498
  SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
L
Liu Jicong 已提交
499
  if (pOffset == NULL) {
500
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
501
  }
502

503 504
  pOffset->consumerId = tmq->consumerId;
  pOffset->offset.val = pVg->offsetInfo.currentOffset;
505

L
Liu Jicong 已提交
506
  int32_t groupLen = strlen(tmq->groupId);
507 508 509
  memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
  pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
510

511 512
  int32_t len = 0;
  int32_t code = 0;
513
  tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
L
Liu Jicong 已提交
514
  if (code < 0) {
515
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
516
  }
517

L
Liu Jicong 已提交
518
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
519 520
  if (buf == NULL) {
    taosMemoryFree(pOffset);
521
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
522
  }
523

L
Liu Jicong 已提交
524
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
525

L
Liu Jicong 已提交
526 527 528 529
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
530
  tEncodeMqVgOffset(&encoder, pOffset);
L
Liu Jicong 已提交
531
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
532 533

  // build param
534
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
535
  if (pParam == NULL) {
L
Liu Jicong 已提交
536
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
537
    taosMemoryFree(buf);
538
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
539
  }
540

L
Liu Jicong 已提交
541 542
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
543 544 545
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
546
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
547 548 549 550

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
551
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
552 553
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
554
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
555
  }
556

557
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
558 559 560 561

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
562
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
563
  pMsgSendInfo->fp = tmqCommitCb;
564
  pMsgSendInfo->msgType = type;
L
Liu Jicong 已提交
565

L
Liu Jicong 已提交
566 567 568
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

H
Haojun Liao 已提交
569
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
570
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
571
  tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
572

573
  char commitBuf[TSDB_OFFSET_LEN] = {0};
H
Haojun Liao 已提交
574
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
575
  tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
576
           tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
577
           totalVgroups, pMsgSendInfo->requestId);
H
Haojun Liao 已提交
578

L
Liu Jicong 已提交
579 580
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
581 582

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
583 584
}

H
Haojun Liao 已提交
585 586 587 588 589 590 591 592 593 594 595
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
596
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
597 598 599
  return NULL;
}

600
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
601 602 603 604 605 606 607 608 609 610 611 612
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
    return;
  }

  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
L
Liu Jicong 已提交
613
    vgId = pRspObj->vgId;
614 615 616
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
L
Liu Jicong 已提交
617
    vgId = pMetaRspObj->vgId;
618 619 620
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
621
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
622
  } else {
623 624
    pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
    return;
L
Liu Jicong 已提交
625 626 627 628
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
629 630
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
L
Liu Jicong 已提交
631
  }
H
Haojun Liao 已提交
632

633 634
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
635
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
636
  pParamSet->userParam = userParam;
L
Liu Jicong 已提交
637

H
Haojun Liao 已提交
638 639
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);

640 641
  tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);

H
Haojun Liao 已提交
642 643
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
X
Xiaoyu Wang 已提交
644 645
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
            pTopicName, numOfTopics);
646 647 648 649
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
  }
L
Liu Jicong 已提交
650

651 652 653 654 655 656
  int32_t j = 0;
  int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
  for (j = 0; j < numOfVgroups; j++) {
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
    if (pVg->vgId == vgId) {
      break;
L
Liu Jicong 已提交
657
    }
L
Liu Jicong 已提交
658
  }
L
Liu Jicong 已提交
659

660
  if (j == numOfVgroups) {
X
Xiaoyu Wang 已提交
661 662
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
            vgId, numOfVgroups, pTopicName);
L
Liu Jicong 已提交
663
    taosMemoryFree(pParamSet);
664 665
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
666 667
  }

668
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
669
  if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
670
    code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
L
Liu Jicong 已提交
671

672 673 674 675 676
    // failed to commit, callback user function directly.
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(pParamSet);
      pCommitFp(tmq, code, userParam);
    }
X
Xiaoyu Wang 已提交
677
  } else {  // do not perform commit, callback user function directly.
678 679
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, code, userParam);
L
Liu Jicong 已提交
680 681 682
  }
}

683
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
684 685
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
686 687
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
688
  }
689 690 691

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
692
  pParamSet->callbackFn = pCommitFp;
693 694
  pParamSet->userParam = userParam;

695 696 697
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

698
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
699
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
700 701

  for (int32_t i = 0; i < numOfTopics; i++) {
702
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
703
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
704

705 706
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
707
    for (int32_t j = 0; j < numOfVgroups; j++) {
708 709
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

H
Haojun Liao 已提交
710
      if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
711
        int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
712 713
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
714
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
715
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
716 717
          continue;
        }
H
Haojun Liao 已提交
718 719

        // update the offset value.
H
Haojun Liao 已提交
720
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
721
      } else {
D
dapan1121 已提交
722
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
723
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
724 725 726 727
      }
    }
  }

H
Haojun Liao 已提交
728
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
729
           numOfTopics);
H
Haojun Liao 已提交
730

L
Liu Jicong 已提交
731
  // no request is sent
L
Liu Jicong 已提交
732 733
  if (pParamSet->totalRspNum == 0) {
    taosMemoryFree(pParamSet);
734 735
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
736 737
  }

L
Liu Jicong 已提交
738
  // count down since waiting rsp num init as 1
739
  commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
740 741
}

742 743
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
744
  if (tmq != NULL) {
S
Shengliang Guan 已提交
745
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
746
    *pTaskType = type;
747 748
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
wmmhello's avatar
wmmhello 已提交
749
    taosReleaseRef(tmqMgmt.rsetId, refId);
750
  }
751 752 753 754 755
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
756
  taosMemoryFree(param);
L
Liu Jicong 已提交
757 758 759
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
760
  int64_t refId = *(int64_t*)param;
761
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
762
  taosMemoryFree(param);
L
Liu Jicong 已提交
763 764 765
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
766 767 768
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
769
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
770 771 772 773
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
774 775

  taosReleaseRef(tmqMgmt.rsetId, refId);
776
  taosMemoryFree(param);
L
Liu Jicong 已提交
777 778
}

779
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
780 781 782 783
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
784 785 786 787
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
788
  int64_t refId = *(int64_t*)param;
789

X
Xiaoyu Wang 已提交
790
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
791
  if (tmq == NULL) {
L
Liu Jicong 已提交
792
    taosMemoryFree(param);
793 794
    return;
  }
D
dapan1121 已提交
795 796 797 798

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
799 800 801 802 803 804 805 806 807 808 809 810 811
  req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
  for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
    TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
    strcpy(data->topicName, pTopic->topicName);
    data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
    for(int j = 0; j < numOfVgroups; j++){
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
      offRows->vgId = pVg->vgId;
      offRows->rows = pVg->numOfRows;
      offRows->offset = pVg->offsetInfo.committedOffset;
812
      tscDebug("report row:%lldd, offset:%" PRId64, offRows->rows, offRows->offset.version);
813 814
    }
  }
D
dapan1121 已提交
815

L
Liu Jicong 已提交
816
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
817 818
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
819
    goto OVER;
D
dapan1121 已提交
820
  }
821

L
Liu Jicong 已提交
822
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
823 824
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
825
    goto OVER;
D
dapan1121 已提交
826
  }
827

D
dapan1121 已提交
828 829 830
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
831
    goto OVER;
D
dapan1121 已提交
832
  }
833 834 835 836

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
837
    goto OVER;
838
  }
839

840
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
841 842 843 844 845

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
846
  sendInfo->msgType = TDMT_MND_TMQ_HB;
847 848 849 850 851 852 853

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
854
  tDeatroySMqHbReq(&req);
855
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
856
  taosReleaseRef(tmqMgmt.rsetId, refId);
857 858
}

859 860
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
X
Xiaoyu Wang 已提交
861
    tscDebug("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
862 863 864
  }
}

865
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
866
  STaosQall* qall = taosAllocateQall();
867
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
868

869 870 871 872
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
873

X
Xiaoyu Wang 已提交
874
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
875 876
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
877

878
  while (pTaskType != NULL) {
879
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
880
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
881 882

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
883
      *pRefId = pTmq->refId;
884

X
Xiaoyu Wang 已提交
885
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
886
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
887
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
888
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
889 890

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
891
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
892
      *pRefId = pTmq->refId;
893

894
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
895
               pTmq->autoCommitInterval / 1000.0);
896
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
897 898
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
899

L
Liu Jicong 已提交
900
    taosFreeQitem(pTaskType);
901
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
902
  }
903

L
Liu Jicong 已提交
904 905 906 907
  taosFreeQall(qall);
  return 0;
}

908
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
909 910 911 912 913 914 915
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
916 917
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
918 919 920
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
921
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
922 923
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
924 925
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
926 927 928
    taosMemoryFree(pRsp->metaRsp.metaRsp);
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
929 930
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
931 932 933
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
934
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
935 936 937 938
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
939 940

  return NULL;
L
Liu Jicong 已提交
941 942
}

L
Liu Jicong 已提交
943
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
944
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
945
  while (1) {
L
Liu Jicong 已提交
946 947 948 949 950
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
951
      break;
L
Liu Jicong 已提交
952
    }
L
Liu Jicong 已提交
953 954
  }

L
Liu Jicong 已提交
955
  rspWrapper = NULL;
L
Liu Jicong 已提交
956 957
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
958 959 960 961 962
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
963
      break;
L
Liu Jicong 已提交
964
    }
L
Liu Jicong 已提交
965 966 967
  }
}

D
dapan1121 已提交
968
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
969 970
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
971 972

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
973 974 975
  tsem_post(&pParam->rspSem);
  return 0;
}
976

L
Liu Jicong 已提交
977
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
978 979 980 981
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
982
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
983
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
984
  }
L
Liu Jicong 已提交
985
  return 0;
X
Xiaoyu Wang 已提交
986 987
}

L
Liu Jicong 已提交
988
int32_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
989 990
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
991
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
992 993 994 995 996 997 998 999 1000 1001
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
1002 1003
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
1004 1005
}

1006 1007 1008 1009 1010 1011
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

1012
void tmqFreeImpl(void* handle) {
1013 1014
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
1015

1016
  // TODO stop timer
L
Liu Jicong 已提交
1017 1018 1019 1020
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
1021

H
Haojun Liao 已提交
1022 1023 1024 1025 1026
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1027
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1028

1029
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1030 1031
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1032 1033

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1034 1035
}

1036 1037 1038 1039 1040 1041 1042 1043 1044
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1045
  if (tmqMgmt.rsetId < 0) {
1046 1047 1048 1049
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1050
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1051 1052 1053 1054
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1055 1056
  }

L
Liu Jicong 已提交
1057 1058
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1059
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1060
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1061 1062
    return NULL;
  }
L
Liu Jicong 已提交
1063

L
Liu Jicong 已提交
1064 1065 1066
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1067 1068 1069
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1070
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1071

X
Xiaoyu Wang 已提交
1072 1073
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1074
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1075
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1076
    goto _failed;
L
Liu Jicong 已提交
1077
  }
L
Liu Jicong 已提交
1078

L
Liu Jicong 已提交
1079 1080
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1081 1082
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
1083

L
Liu Jicong 已提交
1084 1085 1086
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1087
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1088
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1089
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1090
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1091 1092
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1093
  pTmq->resetOffsetCfg = conf->resetOffset;
wmmhello's avatar
wmmhello 已提交
1094
  taosInitRWLatch(&pTmq->lock);
L
Liu Jicong 已提交
1095

1096 1097
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1098
  // assign consumerId
L
Liu Jicong 已提交
1099
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1100

L
Liu Jicong 已提交
1101 1102
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1103
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1104
             pTmq->groupId);
1105
    goto _failed;
L
Liu Jicong 已提交
1106
  }
L
Liu Jicong 已提交
1107

L
Liu Jicong 已提交
1108 1109 1110
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1111
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1112
    tsem_destroy(&pTmq->rspSem);
1113
    goto _failed;
L
Liu Jicong 已提交
1114
  }
L
Liu Jicong 已提交
1115

1116 1117
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1118
    goto _failed;
1119 1120
  }

1121
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1122 1123
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1124
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1125 1126
  }

1127
  char         buf[TSDB_OFFSET_LEN] = {0};
1128 1129
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1130 1131 1132 1133
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1134

1135
  return pTmq;
1136

1137 1138
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1139
  return NULL;
1140 1141
}

L
Liu Jicong 已提交
1142
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1143
  const int32_t   MAX_RETRY_COUNT = 120 * 2;  // let's wait for 2 mins at most
L
Liu Jicong 已提交
1144 1145 1146
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1147
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1148
  SCMSubscribeReq req = {0};
1149
  int32_t         code = 0;
1150

1151
  tscDebug("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1152

1153
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1154
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1155
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1156 1157
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1158 1159 1160 1161
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1162

1163 1164 1165 1166 1167 1168
  req.withTbName = tmq->withTbName;
  req.useSnapshot = tmq->useSnapshot;
  req.autoCommit = tmq->autoCommit;
  req.autoCommitInterval = tmq->autoCommitInterval;
  req.resetOffsetCfg = tmq->resetOffsetCfg;

L
Liu Jicong 已提交
1169 1170
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1171 1172

    SName name = {0};
L
Liu Jicong 已提交
1173 1174 1175 1176
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1177 1178
    }

1179
    tNameExtractFullName(&name, topicFName);
X
Xiaoyu Wang 已提交
1180
    tscDebug("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1181 1182

    taosArrayPush(req.topicNames, &topicFName);
1183 1184
  }

L
Liu Jicong 已提交
1185
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1186

L
Liu Jicong 已提交
1187
  buf = taosMemoryMalloc(tlen);
1188 1189 1190 1191
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1192

1193 1194 1195
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1196
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1197 1198 1199 1200
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1201

H
Haojun Liao 已提交
1202
  SMqSubscribeCbParam param = { .rspErr = 0};
1203
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1204
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1205 1206
    goto FAIL;
  }
L
Liu Jicong 已提交
1207

1208
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1209

L
Liu Jicong 已提交
1210 1211
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1212 1213
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1214
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1215

1216 1217 1218 1219 1220
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1221 1222
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1223
  sendInfo = NULL;
L
Liu Jicong 已提交
1224

L
Liu Jicong 已提交
1225 1226
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1227

1228 1229 1230 1231
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1232

L
Liu Jicong 已提交
1233
  int32_t retryCnt = 0;
1234
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1235
    if (retryCnt++ > MAX_RETRY_COUNT) {
wmmhello's avatar
wmmhello 已提交
1236
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
1237
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
1238 1239
      goto FAIL;
    }
1240

X
Xiaoyu Wang 已提交
1241
    tscDebug("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1242 1243
    taosMsleep(500);
  }
1244

1245 1246
  // init ep timer
  if (tmq->epTimer == NULL) {
1247 1248 1249
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1250
  }
L
Liu Jicong 已提交
1251 1252

  // init auto commit timer
1253
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1254 1255 1256
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1257 1258
  }

L
Liu Jicong 已提交
1259
FAIL:
L
Liu Jicong 已提交
1260
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1261
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1262
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1263

L
Liu Jicong 已提交
1264
  return code;
1265 1266
}

L
Liu Jicong 已提交
1267
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1268
  conf->commitCb = cb;
L
Liu Jicong 已提交
1269
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1270
}
1271

wmmhello's avatar
wmmhello 已提交
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t  vgId){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        if(pVgCur->vgId == vgId){
          return pVgCur;
        }
      }
    }
  }
  return NULL;
}

static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      return pTopicCur;
    }
  }
  return NULL;
}

D
dapan1121 已提交
1300
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1301
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1302 1303

  int64_t         refId = pParam->refId;
wmmhello's avatar
wmmhello 已提交
1304 1305
//  SMqClientVg*    pVg = pParam->pVg;
//  SMqClientTopic* pTopic = pParam->pTopic;
1306

1307
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1308 1309 1310
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1311
    taosMemoryFree(pMsg->pEpSet);
1312 1313 1314 1315
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1316 1317 1318 1319
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1320
  if (code != 0) {
L
Liu Jicong 已提交
1321
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1322 1323
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1324
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1325
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1326
//      taosMsleep(500);
L
Liu Jicong 已提交
1327
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1328 1329
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1330
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1331
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1332
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1333 1334
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1335 1336
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1337

L
Liu Jicong 已提交
1338 1339
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
1340 1341
//    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
//      taosMsleep(5);
wmmhello's avatar
wmmhello 已提交
1342 1343 1344
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1345
    }
H
Haojun Liao 已提交
1346

L
fix txn  
Liu Jicong 已提交
1347
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1348 1349
  }

X
Xiaoyu Wang 已提交
1350
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1351 1352
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1353
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1354 1355
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1356
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1357

1358
    tsem_post(&tmq->rspSem);
1359 1360
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1361
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1362
    taosMemoryFree(pMsg->pEpSet);
wmmhello's avatar
wmmhello 已提交
1363 1364
    taosMemoryFree(pParam);

X
Xiaoyu Wang 已提交
1365 1366 1367
    return 0;
  }

1368
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1369
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1370
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1371 1372
  }

L
Liu Jicong 已提交
1373 1374 1375
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1376
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1377
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1378
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1379
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1380 1381
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1382
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1383
  }
L
Liu Jicong 已提交
1384

L
Liu Jicong 已提交
1385
  pRspWrapper->tmqRspType = rspType;
wmmhello's avatar
wmmhello 已提交
1386 1387
//  pRspWrapper->vgHandle = pVg;
//  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1388
  pRspWrapper->reqId = requestId;
1389
  pRspWrapper->pEpset = pMsg->pEpSet;
wmmhello's avatar
wmmhello 已提交
1390 1391
  pRspWrapper->vgId = vgId;
  strcpy(pRspWrapper->topicName, pParam->topicName);
L
Liu Jicong 已提交
1392

1393
  pMsg->pEpSet = NULL;
L
Liu Jicong 已提交
1394
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1395 1396
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1397
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1398
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1399
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1400

1401 1402
    char buf[TSDB_OFFSET_LEN];
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1403
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1404
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1405
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1406 1407
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1408
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1409
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1410
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1411 1412 1413 1414 1415 1416
  } else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1417 1418
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1419
  }
L
Liu Jicong 已提交
1420

L
Liu Jicong 已提交
1421
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1422
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1423

1424
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1425
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1426
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1427

1428
  tsem_post(&tmq->rspSem);
1429
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1430
  taosMemoryFree(pParam);
1431

L
Liu Jicong 已提交
1432
  return 0;
H
Haojun Liao 已提交
1433

L
fix txn  
Liu Jicong 已提交
1434
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1435
  if (epoch == tmq->epoch) {
wmmhello's avatar
wmmhello 已提交
1436 1437 1438 1439 1440
    taosWLockLatch(&tmq->lock);
    SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
    if(pVg){
      atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
    }
wmmhello's avatar
wmmhello 已提交
1441
    taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
1442
  }
H
Haojun Liao 已提交
1443

1444
  tsem_post(&tmq->rspSem);
1445
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1446
  taosMemoryFree(pParam);
1447

L
Liu Jicong 已提交
1448
  return -1;
1449 1450
}

H
Haojun Liao 已提交
1451 1452 1453 1454 1455
typedef struct SVgroupSaveInfo {
  STqOffsetVal offset;
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1456 1457 1458 1459 1460 1461
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1462
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1463 1464 1465 1466 1467
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

wmmhello's avatar
wmmhello 已提交
1468
  tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1469 1470 1471 1472
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1473 1474

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1475
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1476

X
Xiaoyu Wang 已提交
1477 1478
    int64_t      numOfRows = 0;
    STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
H
Haojun Liao 已提交
1479 1480 1481
    if (pInfo != NULL) {
      offsetNew = pInfo->offset;
      numOfRows = pInfo->numOfRows;
H
Haojun Liao 已提交
1482 1483 1484 1485 1486 1487
    }

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
wmmhello's avatar
wmmhello 已提交
1488
        .vgStatus = TMQ_VG_STATUS__IDLE,
H
Haojun Liao 已提交
1489
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1490
        .emptyBlockReceiveTs = 0,
H
Haojun Liao 已提交
1491
        .numOfRows = numOfRows,
H
Haojun Liao 已提交
1492 1493
    };

H
Haojun Liao 已提交
1494 1495 1496 1497
    clientVg.offsetInfo.currentOffset = offsetNew;
    clientVg.offsetInfo.committedOffset = offsetNew;
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
1498 1499 1500
    clientVg.seekUpdated = false;
    clientVg.receivedInfoFromVnode = false;

H
Haojun Liao 已提交
1501 1502 1503 1504 1505 1506 1507 1508 1509 1510 1511 1512 1513
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1514
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1515 1516
  bool set = false;

1517
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1518
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1519

X
Xiaoyu Wang 已提交
1520 1521
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1522
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
wmmhello's avatar
wmmhello 已提交
1523 1524 1525
  if (epoch <= tmq->epoch) {
    return false;
  }
1526 1527 1528 1529 1530 1531

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1532 1533
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1534 1535 1536
    taosArrayDestroy(newTopics);
    return false;
  }
1537

H
Haojun Liao 已提交
1538
  // todo extract method
1539 1540 1541 1542 1543
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
wmmhello's avatar
wmmhello 已提交
1544
      tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1545 1546
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1547 1548
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

1549 1550
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
X
Xiaoyu Wang 已提交
1551 1552
        tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
                 vgKey, buf);
H
Haojun Liao 已提交
1553

H
Haojun Liao 已提交
1554
        SVgroupSaveInfo info = {.offset = pVgCur->offsetInfo.currentOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1555
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1556 1557 1558 1559 1560 1561 1562
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1563
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1564 1565
    taosArrayPush(newTopics, &topic);
  }
1566

H
Haojun Liao 已提交
1567 1568
  taosHashCleanup(pVgOffsetHashMap);

wmmhello's avatar
wmmhello 已提交
1569
  taosWLockLatch(&tmq->lock);
1570
  // destroy current buffered existed topics info
1571
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1572
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1573
  }
H
Haojun Liao 已提交
1574
  tmq->clientTopics = newTopics;
wmmhello's avatar
wmmhello 已提交
1575
  taosWUnLockLatch(&tmq->lock);
1576

X
Xiaoyu Wang 已提交
1577
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1578
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1579
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1580

1581
  tscDebug("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1582 1583 1584
  return set;
}

1585
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1586
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1587 1588 1589
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1590
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
wmmhello's avatar
wmmhello 已提交
1591
//    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
1592

1593
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1594
    taosMemoryFree(pMsg->pEpSet);
1595 1596
    taosMemoryFree(pParam);
    return terrno;
1597 1598
  }

H
Haojun Liao 已提交
1599
  if (code != TSDB_CODE_SUCCESS) {
1600 1601 1602 1603 1604 1605 1606 1607 1608
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1609
  }
L
Liu Jicong 已提交
1610

L
Liu Jicong 已提交
1611
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1612
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1613
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1614 1615 1616
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1617 1618
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
             tmq->consumerId, head->epoch, epoch);
1619

1620 1621 1622 1623 1624 1625 1626 1627
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1628
  } else {
1629 1630
    tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
             head->epoch, epoch);
1631
  }
L
Liu Jicong 已提交
1632

1633
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1634 1635
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1636
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1637
  taosMemoryFree(pMsg->pData);
1638
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1639
  return code;
1640 1641
}

L
Liu Jicong 已提交
1642
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1643 1644 1645 1646
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1647

1648
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1649
  pReq->consumerId = tmq->consumerId;
1650
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1651
  pReq->epoch = tmq->epoch;
H
Haojun Liao 已提交
1652
  pReq->reqOffset = pVg->offsetInfo.currentOffset;
D
dapan1121 已提交
1653
  pReq->head.vgId = pVg->vgId;
1654 1655
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1656 1657
}

L
Liu Jicong 已提交
1658 1659
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1660
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1661 1662 1663 1664 1665 1666 1667 1668
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1669
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1670 1671
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1672

1673
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1674 1675
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1676

L
Liu Jicong 已提交
1677
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1678
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1679
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1680

L
Liu Jicong 已提交
1681 1682
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1683

L
Liu Jicong 已提交
1684
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1685 1686
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1687

1688
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1689
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1690
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1691
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1692
    pVg->numOfRows += rows;
1693
    (*numOfRows) += rows;
1694 1695
  }

L
Liu Jicong 已提交
1696
  return pRspObj;
X
Xiaoyu Wang 已提交
1697 1698
}

1699
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1700
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1701
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1702 1703 1704 1705
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1706
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1707 1708 1709

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1710
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1711 1712 1713
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

1714 1715 1716 1717 1718 1719 1720
  // extract the rows in this data packet
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
    int64_t            rows = htobe64(pRetrieve->numOfRows);
    pVg->numOfRows += rows;
    (*numOfRows) += rows;
  }
L
Liu Jicong 已提交
1721 1722 1723
  return pRspObj;
}

1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
wmmhello's avatar
wmmhello 已提交
1757 1758 1759
//  pParam->pVg = pVg;  // pVg may be released,fix it
//  pParam->pTopic = pTopic;
  strcpy(pParam->topicName, pTopic->topicName);
1760
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1761
  pParam->requestId = req.reqId;
1762 1763 1764 1765 1766 1767 1768 1769

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1770
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1771 1772 1773 1774 1775 1776 1777
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
1778
  char    offsetFormatBuf[TSDB_OFFSET_LEN];
H
Haojun Liao 已提交
1779
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
1780

X
Xiaoyu Wang 已提交
1781 1782
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1783 1784 1785
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1786
  pVg->seekUpdated = false;   // reset this flag.
1787 1788 1789 1790 1791
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1792
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1793
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1794 1795 1796
  if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
    return 0;
  }
1797
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1798
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1799 1800

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1801
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1802
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1803 1804

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1805
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
X
Xiaoyu Wang 已提交
1806
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 100ms
1807
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
X
Xiaoyu Wang 已提交
1808
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1809 1810 1811
        continue;
      }

1812
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1813
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1814
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1815
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1816
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1817 1818
        continue;
      }
1819

L
Liu Jicong 已提交
1820
      atomic_store_32(&pVg->vgSkipCnt, 0);
1821 1822 1823
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1824
      }
X
Xiaoyu Wang 已提交
1825 1826
    }
  }
1827

1828
  tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
X
Xiaoyu Wang 已提交
1829 1830 1831
  return 0;
}

H
Haojun Liao 已提交
1832
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1833
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1834
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1835 1836
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1837
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1838
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1839
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1840
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1841 1842
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1843
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1844 1845 1846 1847 1848 1849 1850 1851
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1852
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1853
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1854

X
Xiaoyu Wang 已提交
1855
  while (1) {
1856 1857
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1858

1859
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1860
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1861 1862
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1863 1864
        return NULL;
      }
X
Xiaoyu Wang 已提交
1865 1866
    }

X
Xiaoyu Wang 已提交
1867
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1868

1869 1870
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1871
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1872
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1873
      return NULL;
1874 1875
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1876

X
Xiaoyu Wang 已提交
1877
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1878 1879 1880
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1881 1882 1883 1884 1885 1886 1887 1888
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
1889 1890 1891 1892
        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1893 1894
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1895 1896 1897
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1898 1899 1900
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
T
t_max 已提交
1901
          tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
1902
          pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
T
t_max 已提交
1903 1904
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
wmmhello's avatar
wmmhello 已提交
1905
        }
1906 1907

        // update the status
X
Xiaoyu Wang 已提交
1908
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1909

1910 1911 1912
        // update the valid wal version range
        pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
        pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
1913
        pVg->receivedInfoFromVnode = true;
1914

1915 1916
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
1917
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1918 1919 1920
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
                   " total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1921
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1922
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1923
          taosFreeQitem(pollRspWrapper);
1924
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1925
          int64_t    numOfRows = 0;
1926
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1927
          tmq->totalRows += numOfRows;
1928
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1929
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
H
Haojun Liao 已提交
1930
                   " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1931
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1932
                   pollRspWrapper->reqId);
1933 1934 1935
          taosFreeQitem(pollRspWrapper);
          return pRsp;
        }
X
Xiaoyu Wang 已提交
1936
      } else {
H
Haojun Liao 已提交
1937
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1938
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1939
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1940 1941
        taosFreeQitem(pollRspWrapper);
      }
1942
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1943
      // todo handle the wal range and epset for each vgroup
1944
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1945
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1946 1947 1948

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1949
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1950 1951 1952 1953 1954 1955 1956 1957
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1958

1959
        if(pollRspWrapper->metaRsp.rspOffset.type != 0){    // if offset is validate
H
Haojun Liao 已提交
1960
          pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
1961
        }
1962

L
Liu Jicong 已提交
1963 1964
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1965
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1966 1967 1968
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
H
Haojun Liao 已提交
1969
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1970
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1971
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1972
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1973
      }
1974 1975
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1976
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1977

L
Liu Jicong 已提交
1978
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1979 1980 1981 1982 1983 1984 1985 1986
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1987

T
t_max 已提交
1988 1989 1990 1991 1992 1993 1994 1995 1996
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
          if(pollRspWrapper->taosxRsp.rspOffset.type != 0) {    // if offset is validate
            tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
            pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
          }
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
1997
        }
1998

L
Liu Jicong 已提交
1999
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
2000

L
Liu Jicong 已提交
2001
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
H
Haojun Liao 已提交
2002 2003
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 " reqId:0x%" PRIx64,
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2004
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
2005
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
2006
          taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2007
          continue;
H
Haojun Liao 已提交
2008
        } else {
X
Xiaoyu Wang 已提交
2009
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
L
Liu Jicong 已提交
2010
        }
wmmhello's avatar
wmmhello 已提交
2011

L
Liu Jicong 已提交
2012
        // build rsp
X
Xiaoyu Wang 已提交
2013
        void*   pRsp = NULL;
2014
        int64_t numOfRows = 0;
L
Liu Jicong 已提交
2015
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
2016
          pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
L
Liu Jicong 已提交
2017
        } else {
2018
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
wmmhello's avatar
wmmhello 已提交
2019
        }
H
Haojun Liao 已提交
2020

2021 2022
        tmq->totalRows += numOfRows;

2023 2024
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
H
Haojun Liao 已提交
2025
        tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
X
Xiaoyu Wang 已提交
2026
                 ", vg total:%" PRId64 " total:%" PRId64 " reqId:0x%" PRIx64,
H
Haojun Liao 已提交
2027
                 tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
H
Haojun Liao 已提交
2028
                 tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2029 2030

        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2031
        return pRsp;
H
Haojun Liao 已提交
2032

L
Liu Jicong 已提交
2033
      } else {
H
Haojun Liao 已提交
2034
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
2035
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
2036
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
2037 2038
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
2039
    } else {
H
Haojun Liao 已提交
2040 2041
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
2042
      bool reset = false;
2043 2044
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
2045
      if (pollIfReset && reset) {
2046
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
2047
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
2048 2049 2050 2051 2052
      }
    }
  }
}

2053
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
2054 2055
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
2056

X
Xiaoyu Wang 已提交
2057 2058
  tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
           timeout);
L
Liu Jicong 已提交
2059

2060
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
2061
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
2062
    tscDebug("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
2063
    taosMsleep(500);  //     sleep for a while
2064 2065 2066
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
2067
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
2068
    int32_t retryCnt = 0;
2069
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
2070
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
2071 2072
        return NULL;
      }
2073

H
Haojun Liao 已提交
2074
      tscDebug("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
2075 2076 2077 2078
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
2079
  while (1) {
L
Liu Jicong 已提交
2080
    tmqHandleAllDelayedTask(tmq);
2081

L
Liu Jicong 已提交
2082
    if (tmqPollImpl(tmq, timeout) < 0) {
2083
      tscDebug("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
2084
    }
L
Liu Jicong 已提交
2085

2086
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
2087
    if (rspObj) {
2088
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
2089
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
2090
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
2091
      tscDebug("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
2092
      return NULL;
X
Xiaoyu Wang 已提交
2093
    }
2094

2095
    if (timeout >= 0) {
L
Liu Jicong 已提交
2096
      int64_t currentTime = taosGetTimestampMs();
2097 2098 2099
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
        tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
2100
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
2101 2102
        return NULL;
      }
2103
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
2104 2105
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
2106
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
2107 2108 2109 2110
    }
  }
}

2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124
static void displayConsumeStatistics(const tmq_t* pTmq) {
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2125
    }
2126
  }
2127

2128 2129
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2130

2131
int32_t tmq_consumer_close(tmq_t* tmq) {
X
Xiaoyu Wang 已提交
2132
  tscDebug("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2133
  displayConsumeStatistics(tmq);
2134

2135 2136 2137 2138 2139 2140
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2141 2142 2143
      }
    }

L
Liu Jicong 已提交
2144
    int32_t     retryCnt = 0;
2145
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2146
    while (1) {
2147
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2148 2149 2150 2151 2152 2153 2154 2155
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2156
    tmq_list_destroy(lst);
2157
  } else {
X
Xiaoyu Wang 已提交
2158
    tscWarn("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2159
  }
H
Haojun Liao 已提交
2160

2161
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2162
  return 0;
2163
}
L
Liu Jicong 已提交
2164

L
Liu Jicong 已提交
2165 2166
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2167
    return "success";
L
Liu Jicong 已提交
2168
  } else if (err == -1) {
L
Liu Jicong 已提交
2169 2170 2171
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2172 2173
  }
}
L
Liu Jicong 已提交
2174

L
Liu Jicong 已提交
2175 2176 2177 2178 2179
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2180 2181
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2182 2183 2184 2185 2186
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2187
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2188 2189
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2190
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2191 2192 2193
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2194 2195 2196
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2197 2198 2199 2200 2201
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2202 2203 2204 2205
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2206 2207 2208
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2209 2210 2211
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2212 2213 2214 2215 2216
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2217 2218 2219 2220
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2221 2222 2223
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2224
  } else if (TD_RES_TMQ_METADATA(res)) {
2225 2226
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2227 2228 2229 2230
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2231

2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
    STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
    if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  }

  // data from tsdb, no valid offset info
  return -1;
}

L
Liu Jicong 已提交
2255 2256 2257 2258 2259 2260 2261
const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2262
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2263 2264
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2265 2266 2267
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2268
    }
L
Liu Jicong 已提交
2269 2270
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2271 2272
  return NULL;
}
2273

2274 2275 2276 2277
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2278
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
2279
  }
L
Liu Jicong 已提交
2280 2281
}

2282
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2283 2284 2285
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2286
}
2287

2288 2289 2290 2291 2292 2293 2294 2295 2296
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2297
  } else {
2298
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
2299 2300
  }

2301 2302
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2303 2304

  tsem_destroy(&pInfo->sem);
2305 2306
  taosMemoryFree(pInfo);

X
Xiaoyu Wang 已提交
2307
  tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
2308 2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319
  return code;
}

void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2320
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2321 2322 2323
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2324
  tsem_post(&pInfo->sem);
2325 2326 2327 2328 2329 2330 2331 2332 2333 2334 2335 2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2351
  tsem_init(&pInfo->sem, 0, 0);
2352 2353

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2354
  tsem_wait(&pInfo->sem);
2355 2356

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2357
  tsem_destroy(&pInfo->sem);
2358 2359 2360 2361 2362
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2363
  SMqAskEpReq req = {0};
2364 2365 2366
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2367 2368 2369

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2370 2371 2372
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2373 2374 2375 2376
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2377 2378 2379
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2380 2381 2382
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2383
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2384
    taosMemoryFree(pReq);
2385 2386 2387

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2388 2389 2390 2391
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2392
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2393
    taosMemoryFree(pReq);
2394 2395 2396

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2397 2398
  }

2399 2400 2401 2402
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2403 2404 2405 2406 2407

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2408 2409
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2410 2411
  }

X
Xiaoyu Wang 已提交
2412
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2413 2414 2415 2416

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2417
  sendInfo->fp = askEpCallbackFn;
2418 2419
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2420 2421
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
  tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2422 2423

  int64_t transporterId = 0;
2424
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2425 2426 2427 2428 2429 2430 2431
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2432 2433 2434
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2435 2436 2437 2438 2439 2440 2441
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2442
  pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
2443
  taosMemoryFree(pParamSet);
2444 2445

  taosReleaseRef(tmqMgmt.rsetId, refId);
2446
  return 0;
2447 2448
}

2449
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2450 2451
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
H
Haojun Liao 已提交
2452 2453
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
             vgId);
2454
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2455 2456 2457
  } else {
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
             waitingRspNum);
2458 2459
  }
}
2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2482 2483
}

2484 2485 2486 2487 2488 2489 2490 2491 2492 2493 2494 2495 2496 2497 2498 2499 2500 2501 2502 2503 2504
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
                                       .end = pHead->walever,
                                       .currentOffset = rsp.rspOffset.version,
2505
                                       .vgId = pParam->vgId};
2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516 2517 2518 2519 2520 2521 2522 2523 2524 2525 2526 2527

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

H
Haojun Liao 已提交
2528
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2529
                                 int32_t* numOfAssignment) {
H
Haojun Liao 已提交
2530 2531 2532
  *numOfAssignment = 0;
  *assignment = NULL;

2533
  int32_t accId = tmq->pTscObj->acctId;
2534
  char    tname[128] = {0};
2535 2536 2537
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2538 2539 2540 2541 2542 2543
  if (pTopic == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2544 2545 2546 2547 2548 2549 2550 2551

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
    return TSDB_CODE_OUT_OF_MEMORY;
  }

2552 2553
  bool needFetch = false;

H
Haojun Liao 已提交
2554 2555
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2556
    if (!pClientVg->receivedInfoFromVnode) {
2557 2558 2559
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2560 2561 2562 2563 2564 2565 2566 2567 2568 2569

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
    if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
      pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
    } else {
      pAssignment->currentOffset = 0;
    }

    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2570
    pAssignment->vgId = pClientVg->vgId;
H
Haojun Liao 已提交
2571 2572
  }

2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583 2584 2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640
  if (needFetch) {
    SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
        destroyCommonInfo(pCommon);
        return terrno;
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
2641
      char    offsetFormatBuf[TSDB_OFFSET_LEN];
2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652 2653 2654
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);

      tscDebug("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
               tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
    int32_t code = pCommon->code;

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(*assignment);
2655
      *assignment = NULL;
2656 2657 2658 2659 2660 2661 2662 2663 2664
      *numOfAssignment = 0;
    } else {
      int32_t num = taosArrayGetSize(pCommon->pList);
      for(int32_t i = 0; i < num; ++i) {
        (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
      }
      *numOfAssignment = num;
    }

2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;

        pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;

2678
        char offsetBuf[TSDB_OFFSET_LEN] = {0};
2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

        tscDebug("vgId:%d offset is update to:%s", p->vgId, offsetBuf);

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
        pOffsetInfo->currentOffset.version = p->currentOffset;
        pOffsetInfo->committedOffset.version = p->currentOffset;
      }
    }

2690 2691 2692 2693 2694
    destroyCommonInfo(pCommon);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2695 2696
}

T
t_max 已提交
2697 2698 2699 2700 2701 2702 2703 2704
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
    if (pAssignment == NULL) {
        return;
    }

    taosMemoryFree(pAssignment);
}

2705
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
H
Haojun Liao 已提交
2706
  if (tmq == NULL) {
H
Haojun Liao 已提交
2707
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
2708 2709 2710
    return TSDB_CODE_INVALID_PARA;
  }

2711 2712 2713 2714 2715
  int32_t accId = tmq->pTscObj->acctId;
  char tname[128] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2716
  if (pTopic == NULL) {
2717
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
H
Haojun Liao 已提交
2718 2719 2720 2721
    return TSDB_CODE_INVALID_PARA;
  }

  SMqClientVg* pVg = NULL;
H
Haojun Liao 已提交
2722 2723
  int32_t      numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
H
Haojun Liao 已提交
2724
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
2725
    if (pClientVg->vgId == vgId) {
H
Haojun Liao 已提交
2726 2727 2728 2729 2730 2731
      pVg = pClientVg;
      break;
    }
  }

  if (pVg == NULL) {
2732
    tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
H
Haojun Liao 已提交
2733 2734 2735 2736 2737 2738
    return TSDB_CODE_INVALID_PARA;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

  int32_t type = pOffsetInfo->currentOffset.type;
2739
  if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
2740
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
H
Haojun Liao 已提交
2741 2742 2743
    return TSDB_CODE_INVALID_PARA;
  }

2744
  if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
H
Haojun Liao 已提交
2745 2746
    tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
             tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
H
Haojun Liao 已提交
2747 2748 2749
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2750 2751 2752
  // update the offset, and then commit to vnode
  if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
    pOffsetInfo->currentOffset.version = offset;
2753
    pOffsetInfo->committedOffset.version = INT64_MIN;
2754
    pVg->seekUpdated = true;
H
Haojun Liao 已提交
2755 2756
  }

H
Haojun Liao 已提交
2757
  SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
2758
  tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
H
Haojun Liao 已提交
2759

H
Haojun Liao 已提交
2760
  tscDebug("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774 2775 2776 2777 2778

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);

  tsem_wait(&pInfo->sem);
  int32_t code = pInfo->code;

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

H
Haojun Liao 已提交
2779 2780 2781 2782 2783 2784
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
             tstrerror(code));
  }

  return code;
2785
}