clientTmq.c 92.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "cJSON.h"
17 18 19
#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22
#include "tdef.h"
#include "tglobal.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

X
Xiaoyu Wang 已提交
27 28
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL   5000
29

30 31
#define OFFSET_IS_RESET_OFFSET(_of)  ((_of) < 0)

32 33
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);

X
Xiaoyu Wang 已提交
34
struct SMqMgmt {
35 36 37
  int8_t  inited;
  tmr_h   timer;
  int32_t rsetId;
38
};
L
Liu Jicong 已提交
39

X
Xiaoyu Wang 已提交
40 41
static TdThreadOnce   tmqInit = PTHREAD_ONCE_INIT;  // initialize only once
volatile int32_t      tmqInitRes = 0;               // initialize rsp code
42
static struct SMqMgmt tmqMgmt = {0};
43

L
Liu Jicong 已提交
44 45 46 47 48 49
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
50 51 52
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
53 54
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
55
struct tmq_list_t {
L
Liu Jicong 已提交
56
  SArray container;
L
Liu Jicong 已提交
57
};
L
Liu Jicong 已提交
58

L
Liu Jicong 已提交
59
struct tmq_conf_t {
60 61 62 63 64 65 66 67
  char           clientId[256];
  char           groupId[TSDB_CGROUP_LEN];
  int8_t         autoCommit;
  int8_t         resetOffset;
  int8_t         withTbName;
  int8_t         snapEnable;
  int32_t        snapBatchSize;
  bool           hbBgEnable;
68 69 70 71 72
  uint16_t       port;
  int32_t        autoCommitInterval;
  char*          ip;
  char*          user;
  char*          pass;
73
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
74
  void*          commitCbUserParam;
L
Liu Jicong 已提交
75 76 77
};

struct tmq_t {
78 79 80 81 82 83 84
  int64_t        refId;
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
  int8_t         withTbName;
  int8_t         useSnapshot;
  int8_t         autoCommit;
  int32_t        autoCommitInterval;
85
  int8_t         resetOffsetCfg;
86 87
  uint64_t       consumerId;
  bool           hbBgEnable;
L
Liu Jicong 已提交
88 89
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
90 91

  // status
wmmhello's avatar
wmmhello 已提交
92
  SRWLatch        lock;
L
Liu Jicong 已提交
93 94
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
95 96
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
97
  int32_t epSkipCnt;
L
Liu Jicong 已提交
98
#endif
99
  // poll info
X
Xiaoyu Wang 已提交
100 101
  int64_t pollCnt;
  int64_t totalRows;
wmmhello's avatar
wmmhello 已提交
102
//  bool    needReportOffsetRows;
L
Liu Jicong 已提交
103

L
Liu Jicong 已提交
104
  // timer
X
Xiaoyu Wang 已提交
105 106 107 108 109 110 111 112 113 114
  tmr_h       hbLiveTimer;
  tmr_h       epTimer;
  tmr_h       reportTimer;
  tmr_h       commitTimer;
  STscObj*    pTscObj;       // connection
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of rsp
  STaosQall*  qall;
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit
  tsem_t      rspSem;
L
Liu Jicong 已提交
115 116
};

117 118
typedef struct SAskEpInfo {
  int32_t code;
H
Haojun Liao 已提交
119
  tsem_t  sem;
120 121
} SAskEpInfo;

X
Xiaoyu Wang 已提交
122 123 124 125 126 127 128 129
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
130
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
131
  TMQ_CONSUMER_STATUS__RECOVER,
L
Liu Jicong 已提交
132 133
};

L
Liu Jicong 已提交
134
enum {
135
  TMQ_DELAYED_TASK__ASK_EP = 1,
L
Liu Jicong 已提交
136 137 138 139
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

H
Haojun Liao 已提交
140
typedef struct SVgOffsetInfo {
L
Liu Jicong 已提交
141 142
  STqOffsetVal committedOffset;
  STqOffsetVal currentOffset;
H
Haojun Liao 已提交
143 144 145 146 147 148 149 150 151 152
  int64_t      walVerBegin;
  int64_t      walVerEnd;
} SVgOffsetInfo;

typedef struct {
  int64_t       pollCnt;
  int64_t       numOfRows;
  SVgOffsetInfo offsetInfo;
  int32_t       vgId;
  int32_t       vgStatus;
H
Haojun Liao 已提交
153 154 155 156
  int32_t       vgSkipCnt;              // here used to mark the slow vgroups
  bool          receivedInfoFromVnode;  // has already received info from vnode
  int64_t       emptyBlockReceiveTs;    // once empty block is received, idle for ignoreCnt then start to poll data
  bool          seekUpdated;            // offset is updated by seek operator, therefore, not update by vnode rsp.
H
Haojun Liao 已提交
157
  SEpSet        epSet;
158 159
} SMqClientVg;

L
Liu Jicong 已提交
160
typedef struct {
161 162 163
  char           topicName[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  SArray*        vgs;  // SArray<SMqClientVg>
L
Liu Jicong 已提交
164
  SSchemaWrapper schema;
165 166
} SMqClientTopic;

L
Liu Jicong 已提交
167 168
typedef struct {
  int8_t          tmqRspType;
169
  int32_t         epoch;  // epoch can be used to guard the vgHandle
170
  int32_t         vgId;
wmmhello's avatar
wmmhello 已提交
171
  char            topicName[TSDB_TOPIC_FNAME_LEN];
L
Liu Jicong 已提交
172 173
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
H
Haojun Liao 已提交
174
  uint64_t        reqId;
175
  SEpSet*         pEpset;
L
Liu Jicong 已提交
176
  union {
L
Liu Jicong 已提交
177 178
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
179
    STaosxRsp  taosxRsp;
L
Liu Jicong 已提交
180
  };
L
Liu Jicong 已提交
181 182
} SMqPollRspWrapper;

L
Liu Jicong 已提交
183
typedef struct {
wmmhello's avatar
wmmhello 已提交
184 185
//  int64_t refId;
//  int32_t epoch;
L
Liu Jicong 已提交
186 187
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
188
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
189

L
Liu Jicong 已提交
190
typedef struct {
191 192 193 194
  int64_t          refId;
  int32_t          epoch;
  void*            pParam;
  __tmq_askep_fn_t pUserFn;
195 196
} SMqAskEpCbParam;

L
Liu Jicong 已提交
197
typedef struct {
198 199
  int64_t         refId;
  int32_t         epoch;
wmmhello's avatar
wmmhello 已提交
200 201 202
  char            topicName[TSDB_TOPIC_FNAME_LEN];
//  SMqClientVg*    pVg;
//  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
203
  int32_t         vgId;
X
Xiaoyu Wang 已提交
204
  uint64_t        requestId;  // request id for debug purpose
X
Xiaoyu Wang 已提交
205
} SMqPollCbParam;
206

207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
typedef struct SMqVgCommon {
  tsem_t        rsp;
  int32_t       numOfRsp;
  SArray*       pList;
  TdThreadMutex mutex;
  int64_t       consumerId;
  char*         pTopicName;
  int32_t       code;
} SMqVgCommon;

typedef struct SMqVgWalInfoParam {
  int32_t      vgId;
  int32_t      epoch;
  int32_t      totalReq;
  SMqVgCommon* pCommon;
} SMqVgWalInfoParam;

224
typedef struct {
225 226
  int64_t        refId;
  int32_t        epoch;
L
Liu Jicong 已提交
227 228
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
229
  int32_t        code;
230
  tmq_commit_cb* callbackFn;
L
Liu Jicong 已提交
231 232
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
X
Xiaoyu Wang 已提交
233
  void* userParam;
234 235 236 237
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
238
  SMqVgOffset*         pOffset;
H
Haojun Liao 已提交
239 240 241
  char                 topicName[TSDB_TOPIC_FNAME_LEN];
  int32_t              vgId;
  tmq_t*               pTmq;
242
} SMqCommitCbParam;
243

244 245 246 247 248
typedef struct SSyncCommitInfo {
  tsem_t  sem;
  int32_t code;
} SSyncCommitInfo;

249
static int32_t doAskEp(tmq_t* tmq);
250 251
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
252
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
253
                               int32_t index, int32_t totalVgroups, int32_t type);
X
Xiaoyu Wang 已提交
254 255 256
static void    commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void    asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void    addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
257

258
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
259
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
260 261 262 263 264
  if (conf == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return conf;
  }

265
  conf->withTbName = false;
L
Liu Jicong 已提交
266
  conf->autoCommit = true;
267
  conf->autoCommitInterval = DEFAULT_AUTO_COMMIT_INTERVAL;
268
  conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
269
  conf->hbBgEnable = true;
270

271 272 273
  return conf;
}

L
Liu Jicong 已提交
274
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
275
  if (conf) {
276 277 278 279 280 281 282 283 284
    if (conf->ip) {
      taosMemoryFree(conf->ip);
    }
    if (conf->user) {
      taosMemoryFree(conf->user);
    }
    if (conf->pass) {
      taosMemoryFree(conf->pass);
    }
L
Liu Jicong 已提交
285 286
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
287 288 289
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
290
  if (strcasecmp(key, "group.id") == 0) {
L
Liu Jicong 已提交
291
    tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
L
Liu Jicong 已提交
292
    return TMQ_CONF_OK;
293
  }
L
Liu Jicong 已提交
294

295
  if (strcasecmp(key, "client.id") == 0) {
L
Liu Jicong 已提交
296
    tstrncpy(conf->clientId, value, 256);
L
Liu Jicong 已提交
297 298
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
299

300 301
  if (strcasecmp(key, "enable.auto.commit") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
302
      conf->autoCommit = true;
L
Liu Jicong 已提交
303
      return TMQ_CONF_OK;
304
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
305
      conf->autoCommit = false;
L
Liu Jicong 已提交
306 307 308 309
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
310
  }
L
Liu Jicong 已提交
311

312
  if (strcasecmp(key, "auto.commit.interval.ms") == 0) {
313
    conf->autoCommitInterval = taosStr2int64(value);
L
Liu Jicong 已提交
314 315 316
    return TMQ_CONF_OK;
  }

317 318 319
  if (strcasecmp(key, "auto.offset.reset") == 0) {
    if (strcasecmp(value, "none") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_NONE;
L
Liu Jicong 已提交
320
      return TMQ_CONF_OK;
321
    } else if (strcasecmp(value, "earliest") == 0) {
322
      conf->resetOffset = TMQ_OFFSET__RESET_EARLIEST;
L
Liu Jicong 已提交
323
      return TMQ_CONF_OK;
324 325
    } else if (strcasecmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_OFFSET__RESET_LATEST;
L
Liu Jicong 已提交
326 327 328 329 330
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
331

332 333
  if (strcasecmp(key, "msg.with.table.name") == 0) {
    if (strcasecmp(value, "true") == 0) {
334
      conf->withTbName = true;
L
Liu Jicong 已提交
335
      return TMQ_CONF_OK;
336
    } else if (strcasecmp(value, "false") == 0) {
337
      conf->withTbName = false;
L
Liu Jicong 已提交
338
      return TMQ_CONF_OK;
339 340 341 342 343
    } else {
      return TMQ_CONF_INVALID;
    }
  }

344 345
  if (strcasecmp(key, "experimental.snapshot.enable") == 0) {
    if (strcasecmp(value, "true") == 0) {
L
Liu Jicong 已提交
346
      conf->snapEnable = true;
347
      return TMQ_CONF_OK;
348
    } else if (strcasecmp(value, "false") == 0) {
L
Liu Jicong 已提交
349
      conf->snapEnable = false;
350 351 352 353 354 355
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

356
  if (strcasecmp(key, "experimental.snapshot.batch.size") == 0) {
357
    conf->snapBatchSize = taosStr2int64(value);
L
Liu Jicong 已提交
358 359 360
    return TMQ_CONF_OK;
  }

361
//  if (strcasecmp(key, "enable.heartbeat.background") == 0) {
X
Xiaoyu Wang 已提交
362 363 364 365 366 367 368
    //    if (strcasecmp(value, "true") == 0) {
    //      conf->hbBgEnable = true;
    //      return TMQ_CONF_OK;
    //    } else if (strcasecmp(value, "false") == 0) {
    //      conf->hbBgEnable = false;
    //      return TMQ_CONF_OK;
    //    } else {
369 370
//    tscError("the default value of enable.heartbeat.background is true, can not be seted");
//    return TMQ_CONF_INVALID;
X
Xiaoyu Wang 已提交
371
    //    }
372
//  }
L
Liu Jicong 已提交
373

374
  if (strcasecmp(key, "td.connect.ip") == 0) {
375
    conf->ip = taosStrdup(value);
L
Liu Jicong 已提交
376 377
    return TMQ_CONF_OK;
  }
378

379
  if (strcasecmp(key, "td.connect.user") == 0) {
380
    conf->user = taosStrdup(value);
L
Liu Jicong 已提交
381 382
    return TMQ_CONF_OK;
  }
383

384
  if (strcasecmp(key, "td.connect.pass") == 0) {
385
    conf->pass = taosStrdup(value);
L
Liu Jicong 已提交
386 387
    return TMQ_CONF_OK;
  }
388

389
  if (strcasecmp(key, "td.connect.port") == 0) {
390
    conf->port = taosStr2int64(value);
L
Liu Jicong 已提交
391 392
    return TMQ_CONF_OK;
  }
393

394
  if (strcasecmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
395 396 397
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
398
  return TMQ_CONF_UNKNOWN;
399 400
}

X
Xiaoyu Wang 已提交
401
tmq_list_t* tmq_list_new() { return (tmq_list_t*)taosArrayInit(0, sizeof(void*)); }
402

L
Liu Jicong 已提交
403 404
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
405
  if (src == NULL || src[0] == 0) return -1;
406
  char* topic = taosStrdup(src);
L
fix  
Liu Jicong 已提交
407
  if (taosArrayPush(container, &topic) == NULL) return -1;
408 409 410
  return 0;
}

L
Liu Jicong 已提交
411
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
412
  SArray* container = &list->container;
L
Liu Jicong 已提交
413
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
414 415
}

L
Liu Jicong 已提交
416 417 418 419 420 421 422 423 424 425
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
//                                  int32_t* numOfVgroups) {
//  int32_t numOfTopics = taosArrayGetSize(pTopicList);
//  *index = -1;
//  *numOfVgroups = 0;
//
//  for (int32_t i = 0; i < numOfTopics; ++i) {
//    SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
//    if (strcmp(pTopic->topicName, pName) != 0) {
//      continue;
//    }
//
//    *numOfVgroups = taosArrayGetSize(pTopic->vgs);
//    for (int32_t j = 0; j < (*numOfVgroups); ++j) {
//      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
//      if (pClientVg->vgId == vgId) {
//        *index = j;
//        return pClientVg;
//      }
//    }
//  }
//
//  return NULL;
//}
450

451 452 453
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
H
Haojun Liao 已提交
454
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
455
  SMqCommitCbParam*    pParam = (SMqCommitCbParam*)param;
456
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
H
Haojun Liao 已提交
457

X
Xiaoyu Wang 已提交
458 459 460 461 462 463 464 465 466 467 468 469 470 471
  //  if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
  //    taosThreadMutexLock(&pParam->pTmq->lock);
  //    int32_t numOfVgroups, index;
  //    SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
  //    &numOfVgroups); if (pVg == NULL) {
  //      tscDebug("consumer:0x%" PRIx64
  //               " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
  //               ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
  //               index + 1, numOfVgroups);
  //    } else { // let's retry the commit
  //      int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
  //      if (code1 != TSDB_CODE_SUCCESS) {  // retry failed.
  //        tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
  //                 " retry failed, ignore this commit. code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
472
  //                 pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
X
Xiaoyu Wang 已提交
473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
  //                 tstrerror(terrno), index + 1, numOfVgroups);
  //      }
  //    }
  //
  //    taosThreadMutexUnlock(&pParam->pTmq->lock);
  //
  //    taosMemoryFree(pParam->pOffset);
  //    taosMemoryFree(pBuf->pData);
  //    taosMemoryFree(pBuf->pEpSet);
  //
  //    commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
  //    return 0;
  //  }
  //
  //  // todo replace the pTmq with refId
488

L
Liu Jicong 已提交
489
  taosMemoryFree(pParam->pOffset);
L
Liu Jicong 已提交
490
  taosMemoryFree(pBuf->pData);
dengyihao's avatar
dengyihao 已提交
491
  taosMemoryFree(pBuf->pEpSet);
L
Liu Jicong 已提交
492

493
  commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
494 495 496
  return 0;
}

497
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
498
                               int32_t index, int32_t totalVgroups, int32_t type) {
499
  SMqVgOffset* pOffset = taosMemoryCalloc(1, sizeof(SMqVgOffset));
L
Liu Jicong 已提交
500
  if (pOffset == NULL) {
501
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
502
  }
503

504 505
  pOffset->consumerId = tmq->consumerId;
  pOffset->offset.val = pVg->offsetInfo.currentOffset;
506

L
Liu Jicong 已提交
507
  int32_t groupLen = strlen(tmq->groupId);
508 509 510
  memcpy(pOffset->offset.subKey, tmq->groupId, groupLen);
  pOffset->offset.subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->offset.subKey + groupLen + 1, pTopicName);
L
Liu Jicong 已提交
511

512 513
  int32_t len = 0;
  int32_t code = 0;
514
  tEncodeSize(tEncodeMqVgOffset, pOffset, len, code);
L
Liu Jicong 已提交
515
  if (code < 0) {
516
    return TSDB_CODE_INVALID_PARA;
L
Liu Jicong 已提交
517
  }
518

L
Liu Jicong 已提交
519
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
L
Liu Jicong 已提交
520 521
  if (buf == NULL) {
    taosMemoryFree(pOffset);
522
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
523
  }
524

L
Liu Jicong 已提交
525
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
526

L
Liu Jicong 已提交
527 528 529 530
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
531
  tEncodeMqVgOffset(&encoder, pOffset);
L
Liu Jicong 已提交
532
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
533 534

  // build param
535
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
536
  if (pParam == NULL) {
L
Liu Jicong 已提交
537
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
538
    taosMemoryFree(buf);
539
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
540
  }
541

L
Liu Jicong 已提交
542 543
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;
H
Haojun Liao 已提交
544 545 546
  pParam->vgId = pVg->vgId;
  pParam->pTmq = tmq;

H
Haojun Liao 已提交
547
  tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
L
Liu Jicong 已提交
548 549 550 551

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
L
Liu Jicong 已提交
552
    taosMemoryFree(pOffset);
L
Liu Jicong 已提交
553 554
    taosMemoryFree(buf);
    taosMemoryFree(pParam);
555
    return TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
556
  }
557

558
  pMsgSendInfo->msgInfo = (SDataBuf) { .pData = buf, .len = sizeof(SMsgHead) + len, .handle = NULL };
L
Liu Jicong 已提交
559 560 561 562

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
L
Liu Jicong 已提交
563
  pMsgSendInfo->paramFreeFp = taosMemoryFree;
564
  pMsgSendInfo->fp = tmqCommitCb;
565
  pMsgSendInfo->msgType = type;
L
Liu Jicong 已提交
566

L
Liu Jicong 已提交
567 568 569
  atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
  atomic_add_fetch_32(&pParamSet->totalRspNum, 1);

H
Haojun Liao 已提交
570
  SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
571
  char offsetBuf[TSDB_OFFSET_LEN] = {0};
572
  tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->offset.val);
573

574
  char commitBuf[TSDB_OFFSET_LEN] = {0};
H
Haojun Liao 已提交
575
  tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
576
  tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
577
           tmq->consumerId, pOffset->offset.subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
578
           totalVgroups, pMsgSendInfo->requestId);
H
Haojun Liao 已提交
579

L
Liu Jicong 已提交
580 581
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
582 583

  return TSDB_CODE_SUCCESS;
L
Liu Jicong 已提交
584 585
}

H
Haojun Liao 已提交
586 587 588 589 590 591 592 593 594 595 596
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, pTopicName) != 0) {
      continue;
    }

    return pTopic;
  }

H
Haojun Liao 已提交
597
  tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName);
H
Haojun Liao 已提交
598 599 600
  return NULL;
}

601
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tmq_commit_cb* pCommitFp, void* userParam) {
602 603 604 605 606 607 608 609 610 611 612 613
  char*   pTopicName = NULL;
  int32_t vgId = 0;
  int32_t code = 0;

  if (pRes == NULL || tmq == NULL) {
    pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
    return;
  }

  if (TD_RES_TMQ(pRes)) {
    SMqRspObj* pRspObj = (SMqRspObj*)pRes;
    pTopicName = pRspObj->topic;
L
Liu Jicong 已提交
614
    vgId = pRspObj->vgId;
615 616 617
  } else if (TD_RES_TMQ_META(pRes)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
    pTopicName = pMetaRspObj->topic;
L
Liu Jicong 已提交
618
    vgId = pMetaRspObj->vgId;
619 620 621
  } else if (TD_RES_TMQ_METADATA(pRes)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
    pTopicName = pRspObj->topic;
622
    vgId = pRspObj->vgId;
L
Liu Jicong 已提交
623
  } else {
624 625
    pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
    return;
L
Liu Jicong 已提交
626 627 628 629
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
630 631
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
L
Liu Jicong 已提交
632
  }
H
Haojun Liao 已提交
633

634 635
  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
636
  pParamSet->callbackFn = pCommitFp;
L
Liu Jicong 已提交
637
  pParamSet->userParam = userParam;
L
Liu Jicong 已提交
638

H
Haojun Liao 已提交
639 640
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);

641 642
  tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);

H
Haojun Liao 已提交
643 644
  SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
  if (pTopic == NULL) {
X
Xiaoyu Wang 已提交
645 646
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId,
            pTopicName, numOfTopics);
647 648 649 650
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
  }
L
Liu Jicong 已提交
651

652 653 654 655 656 657
  int32_t j = 0;
  int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
  for (j = 0; j < numOfVgroups; j++) {
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
    if (pVg->vgId == vgId) {
      break;
L
Liu Jicong 已提交
658
    }
L
Liu Jicong 已提交
659
  }
L
Liu Jicong 已提交
660

661
  if (j == numOfVgroups) {
X
Xiaoyu Wang 已提交
662 663
    tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId,
            vgId, numOfVgroups, pTopicName);
L
Liu Jicong 已提交
664
    taosMemoryFree(pParamSet);
665 666
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
667 668
  }

669
  SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
H
Haojun Liao 已提交
670
  if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
671
    code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, type);
L
Liu Jicong 已提交
672

673 674 675 676 677
    // failed to commit, callback user function directly.
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(pParamSet);
      pCommitFp(tmq, code, userParam);
    }
X
Xiaoyu Wang 已提交
678
  } else {  // do not perform commit, callback user function directly.
679 680
    taosMemoryFree(pParamSet);
    pCommitFp(tmq, code, userParam);
L
Liu Jicong 已提交
681 682 683
  }
}

684
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
685 686
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
687 688
    pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
    return;
689
  }
690 691 692

  pParamSet->refId = tmq->refId;
  pParamSet->epoch = tmq->epoch;
693
  pParamSet->callbackFn = pCommitFp;
694 695
  pParamSet->userParam = userParam;

696 697 698
  // init as 1 to prevent concurrency issue
  pParamSet->waitingRspNum = 1;

699
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
700
  tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
701 702

  for (int32_t i = 0; i < numOfTopics; i++) {
703
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
704
    int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
L
Liu Jicong 已提交
705

706 707
    tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
             numOfVgroups);
708
    for (int32_t j = 0; j < numOfVgroups; j++) {
709 710
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

H
Haojun Liao 已提交
711
      if (pVg->offsetInfo.currentOffset.type > 0 && !tOffsetEqual(&pVg->offsetInfo.currentOffset, &pVg->offsetInfo.committedOffset)) {
712
        int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups, TDMT_VND_TMQ_COMMIT_OFFSET);
713 714
        if (code != TSDB_CODE_SUCCESS) {
          tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
H
Haojun Liao 已提交
715
                   tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version, tstrerror(terrno),
716
                   j + 1, numOfVgroups);
L
Liu Jicong 已提交
717 718
          continue;
        }
H
Haojun Liao 已提交
719 720

        // update the offset value.
H
Haojun Liao 已提交
721
        pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
722
      } else {
D
dapan1121 已提交
723
        tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
H
Haojun Liao 已提交
724
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.currentOffset.version, j + 1, numOfVgroups);
725 726 727 728
      }
    }
  }

H
Haojun Liao 已提交
729
  tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
730
           numOfTopics);
H
Haojun Liao 已提交
731

L
Liu Jicong 已提交
732
  // no request is sent
L
Liu Jicong 已提交
733 734
  if (pParamSet->totalRspNum == 0) {
    taosMemoryFree(pParamSet);
735 736
    pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
    return;
L
Liu Jicong 已提交
737 738
  }

L
Liu Jicong 已提交
739
  // count down since waiting rsp num init as 1
740
  commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
741 742
}

743 744
static void generateTimedTask(int64_t refId, int32_t type) {
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
745
  if (tmq != NULL) {
S
Shengliang Guan 已提交
746
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
747
    *pTaskType = type;
748 749
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
wmmhello's avatar
wmmhello 已提交
750
    taosReleaseRef(tmqMgmt.rsetId, refId);
751
  }
752 753 754 755 756
}

void tmqAssignAskEpTask(void* param, void* tmrId) {
  int64_t refId = *(int64_t*)param;
  generateTimedTask(refId, TMQ_DELAYED_TASK__ASK_EP);
757
  taosMemoryFree(param);
L
Liu Jicong 已提交
758 759 760
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
761
  int64_t refId = *(int64_t*)param;
762
  generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
763
  taosMemoryFree(param);
L
Liu Jicong 已提交
764 765 766
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
767 768 769
  int64_t refId = *(int64_t*)param;
  tmq_t*  tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
  if (tmq != NULL) {
S
Shengliang Guan 已提交
770
    int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
771 772 773 774
    *pTaskType = TMQ_DELAYED_TASK__REPORT;
    taosWriteQitem(tmq->delayedTask, pTaskType);
    tsem_post(&tmq->rspSem);
  }
775 776

  taosReleaseRef(tmqMgmt.rsetId, refId);
777
  taosMemoryFree(param);
L
Liu Jicong 已提交
778 779
}

780
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
dengyihao's avatar
dengyihao 已提交
781 782 783 784
  if (pMsg) {
    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
  }
785 786 787 788
  return 0;
}

void tmqSendHbReq(void* param, void* tmrId) {
789
  int64_t refId = *(int64_t*)param;
790

X
Xiaoyu Wang 已提交
791
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
792
  if (tmq == NULL) {
L
Liu Jicong 已提交
793
    taosMemoryFree(param);
794 795
    return;
  }
D
dapan1121 已提交
796 797 798 799

  SMqHbReq req = {0};
  req.consumerId = tmq->consumerId;
  req.epoch = tmq->epoch;
wmmhello's avatar
wmmhello 已提交
800
//  if(tmq->needReportOffsetRows){
801 802 803 804 805 806 807 808 809 810 811 812
    req.topics = taosArrayInit(taosArrayGetSize(tmq->clientTopics), sizeof(TopicOffsetRows));
    for(int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++){
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      int32_t         numOfVgroups = taosArrayGetSize(pTopic->vgs);
      TopicOffsetRows* data = taosArrayReserve(req.topics, 1);
      strcpy(data->topicName, pTopic->topicName);
      data->offsetRows = taosArrayInit(numOfVgroups, sizeof(OffsetRows));
      for(int j = 0; j < numOfVgroups; j++){
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
        offRows->vgId = pVg->vgId;
        offRows->rows = pVg->numOfRows;
wmmhello's avatar
wmmhello 已提交
813
        offRows->offset = pVg->offsetInfo.currentOffset;
814 815 816
        char buf[TSDB_OFFSET_LEN] = {0};
        tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
        tscInfo("report offset: vgId:%d, offset:%s, rows:%"PRId64, offRows->vgId, buf, offRows->rows);
817
      }
818
    }
wmmhello's avatar
wmmhello 已提交
819 820
//    tmq->needReportOffsetRows = false;
//  }
D
dapan1121 已提交
821

L
Liu Jicong 已提交
822
  int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req);
D
dapan1121 已提交
823 824
  if (tlen < 0) {
    tscError("tSerializeSMqHbReq failed");
825
    goto OVER;
D
dapan1121 已提交
826
  }
827

L
Liu Jicong 已提交
828
  void* pReq = taosMemoryCalloc(1, tlen);
D
dapan1121 已提交
829 830
  if (tlen < 0) {
    tscError("failed to malloc MqHbReq msg, size:%d", tlen);
831
    goto OVER;
D
dapan1121 已提交
832
  }
833

D
dapan1121 已提交
834 835 836
  if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) {
    tscError("tSerializeSMqHbReq %d failed", tlen);
    taosMemoryFree(pReq);
837
    goto OVER;
D
dapan1121 已提交
838
  }
839 840 841 842

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pReq);
L
Liu Jicong 已提交
843
    goto OVER;
844
  }
845

846
  sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
847 848 849 850 851

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = NULL;
  sendInfo->fp = tmqHbCb;
L
Liu Jicong 已提交
852
  sendInfo->msgType = TDMT_MND_TMQ_HB;
853 854 855 856 857 858 859

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

OVER:
860
  tDeatroySMqHbReq(&req);
861
  taosTmrReset(tmqSendHbReq, 1000, param, tmqMgmt.timer, &tmq->hbLiveTimer);
862
  taosReleaseRef(tmqMgmt.rsetId, refId);
863 864
}

865 866
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
  if (code != 0) {
867
    tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
868 869 870
  }
}

871
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
L
Liu Jicong 已提交
872
  STaosQall* qall = taosAllocateQall();
873
  taosReadAllQitems(pTmq->delayedTask, qall);
L
Liu Jicong 已提交
874

875 876 877 878
  if (qall->numOfItems == 0) {
    taosFreeQall(qall);
    return TSDB_CODE_SUCCESS;
  }
879

X
Xiaoyu Wang 已提交
880
  tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, qall->numOfItems);
881 882
  int8_t* pTaskType = NULL;
  taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
883

884
  while (pTaskType != NULL) {
885
    if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
886
      asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
887 888

      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
889
      *pRefId = pTmq->refId;
890

X
Xiaoyu Wang 已提交
891
      tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
892
      taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
L
Liu Jicong 已提交
893
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
X
Xiaoyu Wang 已提交
894
      tmq_commit_cb* pCallbackFn = pTmq->commitCb ? pTmq->commitCb : defaultCommitCbFn;
895 896

      asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
897
      int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
898
      *pRefId = pTmq->refId;
899

900
      tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
X
Xiaoyu Wang 已提交
901
               pTmq->autoCommitInterval / 1000.0);
902
      taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
L
Liu Jicong 已提交
903 904
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    }
905

L
Liu Jicong 已提交
906
    taosFreeQitem(pTaskType);
907
    taosGetQitem(qall, (void**)&pTaskType);
L
Liu Jicong 已提交
908
  }
909

L
Liu Jicong 已提交
910 911 912 913
  taosFreeQall(qall);
  return 0;
}

914
static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
L
Liu Jicong 已提交
915 916 917 918 919
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
    // do nothing
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
    SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
    tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
920
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
L
Liu Jicong 已提交
921
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
922 923
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
924 925 926
    taosArrayDestroyP(pRsp->dataRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->dataRsp.blockDataLen);
    taosArrayDestroyP(pRsp->dataRsp.blockTbName, taosMemoryFree);
927
    taosArrayDestroyP(pRsp->dataRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
928 929
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
930 931
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
932
    taosMemoryFree(pRsp->metaRsp.metaRsp);
933
  } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
L
Liu Jicong 已提交
934
    SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
935 936
    taosMemoryFreeClear(pRsp->pEpset);

L
Liu Jicong 已提交
937 938 939
    taosArrayDestroyP(pRsp->taosxRsp.blockData, taosMemoryFree);
    taosArrayDestroy(pRsp->taosxRsp.blockDataLen);
    taosArrayDestroyP(pRsp->taosxRsp.blockTbName, taosMemoryFree);
940
    taosArrayDestroyP(pRsp->taosxRsp.blockSchema, (FDelete)tDeleteSchemaWrapper);
L
Liu Jicong 已提交
941 942 943 944
    // taosx
    taosArrayDestroy(pRsp->taosxRsp.createTableLen);
    taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree);
  }
945 946

  return NULL;
L
Liu Jicong 已提交
947 948
}

L
Liu Jicong 已提交
949
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
950
  SMqRspWrapper* rspWrapper = NULL;
L
Liu Jicong 已提交
951
  while (1) {
L
Liu Jicong 已提交
952 953 954 955 956
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
957
      break;
L
Liu Jicong 已提交
958
    }
L
Liu Jicong 已提交
959 960
  }

L
Liu Jicong 已提交
961
  rspWrapper = NULL;
L
Liu Jicong 已提交
962 963
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
L
Liu Jicong 已提交
964 965 966 967 968
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper) {
      tmqFreeRspWrapper(rspWrapper);
      taosFreeQitem(rspWrapper);
    } else {
L
Liu Jicong 已提交
969
      break;
L
Liu Jicong 已提交
970
    }
L
Liu Jicong 已提交
971 972 973
  }
}

D
dapan1121 已提交
974
int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
975 976
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
dengyihao's avatar
dengyihao 已提交
977 978

  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
979 980 981
  tsem_post(&pParam->rspSem);
  return 0;
}
982

L
Liu Jicong 已提交
983
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
984 985 986 987
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
988
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
989
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
990
  }
L
Liu Jicong 已提交
991
  return 0;
X
Xiaoyu Wang 已提交
992 993
}

L
Liu Jicong 已提交
994
int32_t tmq_unsubscribe(tmq_t* tmq) {
995 996 997 998 999 1000 1001 1002
  if (tmq->autoCommit) {
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
      return rsp;
    }
  }
  taosSsleep(2);  // sleep 2s for hb to send offset and rows to server

L
Liu Jicong 已提交
1003 1004
  int32_t     rsp;
  int32_t     retryCnt = 0;
L
Liu Jicong 已提交
1005
  tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
1006 1007 1008 1009 1010 1011 1012 1013 1014 1015
  while (1) {
    rsp = tmq_subscribe(tmq, lst);
    if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
      break;
    } else {
      retryCnt++;
      taosMsleep(500);
    }
  }

L
Liu Jicong 已提交
1016 1017
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
1018 1019
}

1020 1021 1022 1023 1024 1025
static void freeClientVgImpl(void* param) {
  SMqClientTopic* pTopic = param;
  taosMemoryFreeClear(pTopic->schema.pSchema);
  taosArrayDestroy(pTopic->vgs);
}

1026
void tmqFreeImpl(void* handle) {
1027 1028
  tmq_t*  tmq = (tmq_t*)handle;
  int64_t id = tmq->consumerId;
L
Liu Jicong 已提交
1029

1030
  // TODO stop timer
L
Liu Jicong 已提交
1031 1032 1033 1034
  if (tmq->mqueue) {
    tmqClearUnhandleMsg(tmq);
    taosCloseQueue(tmq->mqueue);
  }
L
Liu Jicong 已提交
1035

H
Haojun Liao 已提交
1036 1037 1038 1039 1040
  if (tmq->delayedTask) {
    taosCloseQueue(tmq->delayedTask);
  }

  taosFreeQall(tmq->qall);
1041
  tsem_destroy(&tmq->rspSem);
L
Liu Jicong 已提交
1042

1043
  taosArrayDestroyEx(tmq->clientTopics, freeClientVgImpl);
1044 1045
  taos_close_internal(tmq->pTscObj);
  taosMemoryFree(tmq);
1046 1047

  tscDebug("consumer:0x%" PRIx64 " closed", id);
L
Liu Jicong 已提交
1048 1049
}

1050 1051 1052 1053 1054 1055 1056 1057 1058
static void tmqMgmtInit(void) {
  tmqInitRes = 0;
  tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");

  if (tmqMgmt.timer == NULL) {
    tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
  }

  tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
1059
  if (tmqMgmt.rsetId < 0) {
1060 1061 1062 1063
    tmqInitRes = terrno;
  }
}

L
Liu Jicong 已提交
1064
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
1065 1066 1067 1068
  taosThreadOnce(&tmqInit, tmqMgmtInit);
  if (tmqInitRes != 0) {
    terrno = tmqInitRes;
    return NULL;
L
Liu Jicong 已提交
1069 1070
  }

L
Liu Jicong 已提交
1071 1072
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
L
Liu Jicong 已提交
1073
    terrno = TSDB_CODE_OUT_OF_MEMORY;
1074
    tscError("failed to create consumer, groupId:%s, code:%s", conf->groupId, terrstr());
L
Liu Jicong 已提交
1075 1076
    return NULL;
  }
L
Liu Jicong 已提交
1077

L
Liu Jicong 已提交
1078 1079 1080
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

L
Liu Jicong 已提交
1081 1082 1083
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->delayedTask = taosOpenQueue();
H
Haojun Liao 已提交
1084
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
1085

X
Xiaoyu Wang 已提交
1086 1087
  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL ||
      conf->groupId[0] == 0) {
L
Liu Jicong 已提交
1088
    terrno = TSDB_CODE_OUT_OF_MEMORY;
X
Xiaoyu Wang 已提交
1089
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
1090
    goto _failed;
L
Liu Jicong 已提交
1091
  }
L
Liu Jicong 已提交
1092

L
Liu Jicong 已提交
1093 1094
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
1095 1096
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
wmmhello's avatar
wmmhello 已提交
1097
//  pTmq->needReportOffsetRows = true;
L
Liu Jicong 已提交
1098

L
Liu Jicong 已提交
1099 1100 1101
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
1102
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
1103
  pTmq->useSnapshot = conf->snapEnable;
L
Liu Jicong 已提交
1104
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
1105
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
1106 1107
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
1108
  pTmq->resetOffsetCfg = conf->resetOffset;
wmmhello's avatar
wmmhello 已提交
1109
  taosInitRWLatch(&pTmq->lock);
L
Liu Jicong 已提交
1110

1111 1112
  pTmq->hbBgEnable = conf->hbBgEnable;

L
Liu Jicong 已提交
1113
  // assign consumerId
L
Liu Jicong 已提交
1114
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
1115

L
Liu Jicong 已提交
1116 1117
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
1118
    tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
S
Shengliang Guan 已提交
1119
             pTmq->groupId);
1120
    goto _failed;
L
Liu Jicong 已提交
1121
  }
L
Liu Jicong 已提交
1122

L
Liu Jicong 已提交
1123 1124 1125
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
1126
    tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId);
L
Liu Jicong 已提交
1127
    tsem_destroy(&pTmq->rspSem);
1128
    goto _failed;
L
Liu Jicong 已提交
1129
  }
L
Liu Jicong 已提交
1130

1131 1132
  pTmq->refId = taosAddRef(tmqMgmt.rsetId, pTmq);
  if (pTmq->refId < 0) {
1133
    goto _failed;
1134 1135
  }

1136
  if (pTmq->hbBgEnable) {
L
Liu Jicong 已提交
1137 1138
    int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
    *pRefId = pTmq->refId;
1139
    pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
1140 1141
  }

1142
  char         buf[TSDB_OFFSET_LEN] = {0};
1143 1144
  STqOffsetVal offset = {.type = pTmq->resetOffsetCfg};
  tFormatOffset(buf, tListLen(buf), &offset);
X
Xiaoyu Wang 已提交
1145 1146 1147 1148
  tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64
          ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s, backgroudHB:%d",
          pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval,
          buf, pTmq->hbBgEnable);
L
Liu Jicong 已提交
1149

1150
  return pTmq;
1151

1152 1153
_failed:
  tmqFreeImpl(pTmq);
L
Liu Jicong 已提交
1154
  return NULL;
1155 1156
}

L
Liu Jicong 已提交
1157
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
1158
  const int32_t   MAX_RETRY_COUNT = 120 * 2;  // let's wait for 2 mins at most
L
Liu Jicong 已提交
1159 1160 1161
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
L
Liu Jicong 已提交
1162
  SMsgSendInfo*   sendInfo = NULL;
L
Liu Jicong 已提交
1163
  SCMSubscribeReq req = {0};
1164
  int32_t         code = 0;
1165

1166
  tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz);
L
Liu Jicong 已提交
1167

1168
  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1169
  tstrncpy(req.clientId, tmq->clientId, 256);
L
Liu Jicong 已提交
1170
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1171 1172
  req.topicNames = taosArrayInit(sz, sizeof(void*));

1173 1174 1175 1176
  if (req.topicNames == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1177

1178 1179 1180 1181 1182
  req.withTbName = tmq->withTbName;
  req.autoCommit = tmq->autoCommit;
  req.autoCommitInterval = tmq->autoCommitInterval;
  req.resetOffsetCfg = tmq->resetOffsetCfg;

L
Liu Jicong 已提交
1183 1184
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1185 1186

    SName name = {0};
L
Liu Jicong 已提交
1187 1188 1189 1190
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1191 1192
    }

1193
    tNameExtractFullName(&name, topicFName);
1194
    tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName);
L
Liu Jicong 已提交
1195 1196

    taosArrayPush(req.topicNames, &topicFName);
1197 1198
  }

L
Liu Jicong 已提交
1199
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
1200

L
Liu Jicong 已提交
1201
  buf = taosMemoryMalloc(tlen);
1202 1203 1204 1205
  if (buf == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
L
Liu Jicong 已提交
1206

1207 1208 1209
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
1210
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1211 1212 1213 1214
  if (sendInfo == NULL) {
    code = TSDB_CODE_OUT_OF_MEMORY;
    goto FAIL;
  }
1215

H
Haojun Liao 已提交
1216
  SMqSubscribeCbParam param = { .rspErr = 0};
1217
  if (tsem_init(&param.rspSem, 0, 0) != 0) {
wmmhello's avatar
wmmhello 已提交
1218
    code = TSDB_CODE_TSC_INTERNAL_ERROR;
1219 1220
    goto FAIL;
  }
L
Liu Jicong 已提交
1221

1222
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = tlen, .handle = NULL};
1223

L
Liu Jicong 已提交
1224 1225
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1226 1227
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1228
  sendInfo->msgType = TDMT_MND_TMQ_SUBSCRIBE;
L
Liu Jicong 已提交
1229

1230 1231 1232 1233 1234
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1235 1236
  // avoid double free if msg is sent
  buf = NULL;
L
Liu Jicong 已提交
1237
  sendInfo = NULL;
L
Liu Jicong 已提交
1238

L
Liu Jicong 已提交
1239 1240
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1241

1242 1243 1244 1245
  if (param.rspErr != 0) {
    code = param.rspErr;
    goto FAIL;
  }
L
Liu Jicong 已提交
1246

L
Liu Jicong 已提交
1247
  int32_t retryCnt = 0;
1248
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
1249
    if (retryCnt++ > MAX_RETRY_COUNT) {
wmmhello's avatar
wmmhello 已提交
1250
      tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
1251
      code = TSDB_CODE_MND_CONSUMER_NOT_READY;
L
Liu Jicong 已提交
1252 1253
      goto FAIL;
    }
1254

1255
    tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
1256 1257
    taosMsleep(500);
  }
1258

1259 1260
  // init ep timer
  if (tmq->epTimer == NULL) {
1261 1262 1263
    int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId1 = tmq->refId;
    tmq->epTimer = taosTmrStart(tmqAssignAskEpTask, 1000, pRefId1, tmqMgmt.timer);
1264
  }
L
Liu Jicong 已提交
1265 1266

  // init auto commit timer
1267
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
1268 1269 1270
    int64_t* pRefId2 = taosMemoryMalloc(sizeof(int64_t));
    *pRefId2 = tmq->refId;
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
L
Liu Jicong 已提交
1271 1272
  }

L
Liu Jicong 已提交
1273
FAIL:
L
Liu Jicong 已提交
1274
  taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1275
  taosMemoryFree(buf);
L
Liu Jicong 已提交
1276
  taosMemoryFree(sendInfo);
L
Liu Jicong 已提交
1277

L
Liu Jicong 已提交
1278
  return code;
1279 1280
}

L
Liu Jicong 已提交
1281
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
1282
  conf->commitCb = cb;
L
Liu Jicong 已提交
1283
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1284
}
1285

wmmhello's avatar
wmmhello 已提交
1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313
static SMqClientVg* getVgInfo(tmq_t* tmq, char* topicName, int32_t  vgId){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        if(pVgCur->vgId == vgId){
          return pVgCur;
        }
      }
    }
  }
  return NULL;
}

static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for(int i = 0; i < topicNumCur; i++){
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if(strcmp(pTopicCur->topicName, topicName) == 0){
      return pTopicCur;
    }
  }
  return NULL;
}

D
dapan1121 已提交
1314
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1315
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
1316 1317

  int64_t         refId = pParam->refId;
wmmhello's avatar
wmmhello 已提交
1318 1319
//  SMqClientVg*    pVg = pParam->pVg;
//  SMqClientTopic* pTopic = pParam->pTopic;
1320

1321
  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
1322 1323 1324
  if (tmq == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1325
    taosMemoryFree(pMsg->pEpSet);
1326 1327 1328 1329
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

H
Haojun Liao 已提交
1330 1331 1332 1333
  int32_t  epoch = pParam->epoch;
  int32_t  vgId = pParam->vgId;
  uint64_t requestId = pParam->requestId;

L
Liu Jicong 已提交
1334
  if (code != 0) {
L
Liu Jicong 已提交
1335
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1336 1337
    if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);

H
Haojun Liao 已提交
1338
    // in case of consumer mismatch, wait for 500ms and retry
L
Liu Jicong 已提交
1339
    if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
1340
//      taosMsleep(500);
L
Liu Jicong 已提交
1341
      atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
X
Xiaoyu Wang 已提交
1342 1343
      tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
               tmq->consumerId);
H
Haojun Liao 已提交
1344
    } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
S
Shengliang Guan 已提交
1345
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1346
      if (pRspWrapper == NULL) {
H
Haojun Liao 已提交
1347 1348
        tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
                tmq->consumerId, vgId, epoch, requestId);
L
Liu Jicong 已提交
1349 1350
        goto CREATE_MSG_FAIL;
      }
H
Haojun Liao 已提交
1351

L
Liu Jicong 已提交
1352 1353
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      taosWriteQitem(tmq->mqueue, pRspWrapper);
1354 1355
//    } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) {  // poll data while insert
//      taosMsleep(5);
wmmhello's avatar
wmmhello 已提交
1356 1357 1358
    } else{
      tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
               vgId, epoch, tstrerror(code), requestId);
L
Liu Jicong 已提交
1359
    }
H
Haojun Liao 已提交
1360

L
fix txn  
Liu Jicong 已提交
1361
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1362 1363
  }

X
Xiaoyu Wang 已提交
1364
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
1365 1366
  int32_t clientEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < clientEpoch) {
L
Liu Jicong 已提交
1367
    // do not write into queue since updating epoch reset
X
Xiaoyu Wang 已提交
1368 1369
    tscWarn("consumer:0x%" PRIx64
            " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
1370
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
H
Haojun Liao 已提交
1371

1372
    tsem_post(&tmq->rspSem);
1373 1374
    taosReleaseRef(tmqMgmt.rsetId, refId);

L
Liu Jicong 已提交
1375
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1376
    taosMemoryFree(pMsg->pEpSet);
wmmhello's avatar
wmmhello 已提交
1377 1378
    taosMemoryFree(pParam);

X
Xiaoyu Wang 已提交
1379 1380 1381
    return 0;
  }

1382
  if (msgEpoch != clientEpoch) {
H
Haojun Liao 已提交
1383
    tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
1384
            tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
X
Xiaoyu Wang 已提交
1385 1386
  }

L
Liu Jicong 已提交
1387 1388 1389
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

S
Shengliang Guan 已提交
1390
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
L
Liu Jicong 已提交
1391
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1392
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1393
    taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
1394 1395
    tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
            epoch);
L
fix txn  
Liu Jicong 已提交
1396
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1397
  }
L
Liu Jicong 已提交
1398

L
Liu Jicong 已提交
1399
  pRspWrapper->tmqRspType = rspType;
wmmhello's avatar
wmmhello 已提交
1400 1401
//  pRspWrapper->vgHandle = pVg;
//  pRspWrapper->topicHandle = pTopic;
H
Haojun Liao 已提交
1402
  pRspWrapper->reqId = requestId;
1403
  pRspWrapper->pEpset = pMsg->pEpSet;
wmmhello's avatar
wmmhello 已提交
1404 1405
  pRspWrapper->vgId = vgId;
  strcpy(pRspWrapper->topicName, pParam->topicName);
L
Liu Jicong 已提交
1406

1407
  pMsg->pEpSet = NULL;
1408
  if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
L
Liu Jicong 已提交
1409 1410
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1411
    tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
wmmhello's avatar
wmmhello 已提交
1412
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1413
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1414

1415 1416
    char buf[TSDB_OFFSET_LEN];
    tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.rspOffset);
H
Haojun Liao 已提交
1417
    tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1418
             tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
L
Liu Jicong 已提交
1419
  } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1420 1421
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
1422
    tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
1423
    tDecoderClear(&decoder);
L
Liu Jicong 已提交
1424
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
1425
  } else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
L
Liu Jicong 已提交
1426 1427 1428 1429 1430
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
    tDecoderClear(&decoder);
    memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
X
Xiaoyu Wang 已提交
1431 1432
  } else {  // invalid rspType
    tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
L
Liu Jicong 已提交
1433
  }
L
Liu Jicong 已提交
1434

L
Liu Jicong 已提交
1435
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
1436
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
1437

1438
  int32_t total = taosQueueItemSize(tmq->mqueue);
H
Haojun Liao 已提交
1439
  tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
1440
           tmq->consumerId, rspType, vgId, total, requestId);
H
Haojun Liao 已提交
1441

1442
  tsem_post(&tmq->rspSem);
1443
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1444
  taosMemoryFree(pParam);
1445

L
Liu Jicong 已提交
1446
  return 0;
H
Haojun Liao 已提交
1447

L
fix txn  
Liu Jicong 已提交
1448
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1449
  if (epoch == tmq->epoch) {
wmmhello's avatar
wmmhello 已提交
1450 1451 1452 1453 1454
    taosWLockLatch(&tmq->lock);
    SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
    if(pVg){
      atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
    }
wmmhello's avatar
wmmhello 已提交
1455
    taosWUnLockLatch(&tmq->lock);
L
Liu Jicong 已提交
1456
  }
H
Haojun Liao 已提交
1457

1458
  tsem_post(&tmq->rspSem);
1459
  taosReleaseRef(tmqMgmt.rsetId, refId);
wmmhello's avatar
wmmhello 已提交
1460
  taosMemoryFree(pParam);
1461

L
Liu Jicong 已提交
1462
  return -1;
1463 1464
}

H
Haojun Liao 已提交
1465
typedef struct SVgroupSaveInfo {
wmmhello's avatar
wmmhello 已提交
1466 1467
  STqOffsetVal currentOffset;
  STqOffsetVal commitOffset;
H
Haojun Liao 已提交
1468 1469 1470
  int64_t      numOfRows;
} SVgroupSaveInfo;

H
Haojun Liao 已提交
1471 1472 1473 1474 1475 1476
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
                                   tmq_t* tmq) {
  pTopic->schema = pTopicEp->schema;
  pTopicEp->schema.nCols = 0;
  pTopicEp->schema.pSchema = NULL;

X
Xiaoyu Wang 已提交
1477
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
H
Haojun Liao 已提交
1478 1479 1480 1481 1482
  int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);

  tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);

1483
  tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
H
Haojun Liao 已提交
1484 1485 1486 1487
  pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));

  for (int32_t j = 0; j < vgNumGet; j++) {
    SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
H
Haojun Liao 已提交
1488 1489

    makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
H
Haojun Liao 已提交
1490
    SVgroupSaveInfo* pInfo = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
H
Haojun Liao 已提交
1491

X
Xiaoyu Wang 已提交
1492
    STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
H
Haojun Liao 已提交
1493 1494 1495 1496 1497

    SMqClientVg clientVg = {
        .pollCnt = 0,
        .vgId = pVgEp->vgId,
        .epSet = pVgEp->epSet,
wmmhello's avatar
wmmhello 已提交
1498
        .vgStatus = TMQ_VG_STATUS__IDLE,
H
Haojun Liao 已提交
1499
        .vgSkipCnt = 0,
H
Haojun Liao 已提交
1500
        .emptyBlockReceiveTs = 0,
wmmhello's avatar
wmmhello 已提交
1501
        .numOfRows = pInfo ? pInfo->numOfRows : 0,
H
Haojun Liao 已提交
1502 1503
    };

wmmhello's avatar
wmmhello 已提交
1504 1505
    clientVg.offsetInfo.currentOffset = pInfo ? pInfo->currentOffset : offsetNew;
    clientVg.offsetInfo.committedOffset = pInfo ? pInfo->commitOffset : offsetNew;
H
Haojun Liao 已提交
1506 1507
    clientVg.offsetInfo.walVerBegin = -1;
    clientVg.offsetInfo.walVerEnd = -1;
1508 1509 1510
    clientVg.seekUpdated = false;
    clientVg.receivedInfoFromVnode = false;

H
Haojun Liao 已提交
1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522 1523
    taosArrayPush(pTopic->vgs, &clientVg);
  }
}

static void freeClientVgInfo(void* param) {
  SMqClientTopic* pTopic = param;
  if (pTopic->schema.nCols) {
    taosMemoryFreeClear(pTopic->schema.pSchema);
  }

  taosArrayDestroy(pTopic->vgs);
}

1524
static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
1525 1526
  bool set = false;

1527
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
1528
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
1529

X
Xiaoyu Wang 已提交
1530
  char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
1531
  tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
1532
           tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
wmmhello's avatar
wmmhello 已提交
1533 1534 1535
  if (epoch <= tmq->epoch) {
    return false;
  }
1536 1537 1538 1539 1540 1541

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

H
Haojun Liao 已提交
1542 1543
  SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pVgOffsetHashMap == NULL) {
1544 1545 1546
    taosArrayDestroy(newTopics);
    return false;
  }
1547

H
Haojun Liao 已提交
1548
  // todo extract method
1549 1550 1551 1552 1553
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
1554
      tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
1555 1556
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
H
Haojun Liao 已提交
1557 1558
        makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);

1559 1560
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.currentOffset);
1561
        tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
X
Xiaoyu Wang 已提交
1562
                 vgKey, buf);
H
Haojun Liao 已提交
1563

wmmhello's avatar
wmmhello 已提交
1564
        SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.currentOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
H
Haojun Liao 已提交
1565
        taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
1566 1567 1568 1569 1570 1571 1572
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
H
Haojun Liao 已提交
1573
    initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq);
1574 1575
    taosArrayPush(newTopics, &topic);
  }
1576

H
Haojun Liao 已提交
1577 1578
  taosHashCleanup(pVgOffsetHashMap);

wmmhello's avatar
wmmhello 已提交
1579
  taosWLockLatch(&tmq->lock);
1580
  // destroy current buffered existed topics info
1581
  if (tmq->clientTopics) {
H
Haojun Liao 已提交
1582
    taosArrayDestroyEx(tmq->clientTopics, freeClientVgInfo);
X
Xiaoyu Wang 已提交
1583
  }
H
Haojun Liao 已提交
1584
  tmq->clientTopics = newTopics;
wmmhello's avatar
wmmhello 已提交
1585
  taosWUnLockLatch(&tmq->lock);
1586

X
Xiaoyu Wang 已提交
1587
  int8_t flag = (topicNumGet == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
H
Haojun Liao 已提交
1588
  atomic_store_8(&tmq->status, flag);
X
Xiaoyu Wang 已提交
1589
  atomic_store_32(&tmq->epoch, epoch);
H
Haojun Liao 已提交
1590

1591
  tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId);
X
Xiaoyu Wang 已提交
1592 1593 1594
  return set;
}

1595
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
1596
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
1597 1598 1599
  tmq_t*           tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);

  if (tmq == NULL) {
1600
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
wmmhello's avatar
wmmhello 已提交
1601
//    pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
1602

1603
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
1604
    taosMemoryFree(pMsg->pEpSet);
1605 1606
    taosMemoryFree(pParam);
    return terrno;
1607 1608
  }

H
Haojun Liao 已提交
1609
  if (code != TSDB_CODE_SUCCESS) {
1610 1611 1612 1613 1614 1615 1616 1617 1618
    tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
    pParam->pUserFn(tmq, code, NULL, pParam->pParam);

    taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

    taosMemoryFree(pMsg->pData);
    taosMemoryFree(pMsg->pEpSet);
    taosMemoryFree(pParam);
    return code;
1619
  }
L
Liu Jicong 已提交
1620

L
Liu Jicong 已提交
1621
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1622
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1623
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1624 1625 1626
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
  if (head->epoch <= epoch) {
1627
    tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
1628
             tmq->consumerId, head->epoch, epoch);
1629

1630 1631 1632 1633 1634 1635 1636 1637
    if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
      SMqAskEpRsp rsp;
      tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
      int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY;
      atomic_store_8(&tmq->status, flag);
      tDeleteSMqAskEpRsp(&rsp);
    }

X
Xiaoyu Wang 已提交
1638
  } else {
1639
    tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
1640
             head->epoch, epoch);
1641
  }
L
Liu Jicong 已提交
1642

1643
  pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
1644 1645
  taosReleaseRef(tmqMgmt.rsetId, pParam->refId);

dengyihao's avatar
dengyihao 已提交
1646
  taosMemoryFree(pMsg->pEpSet);
L
Liu Jicong 已提交
1647
  taosMemoryFree(pMsg->pData);
1648
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1649
  return code;
1650 1651
}

L
Liu Jicong 已提交
1652
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1653 1654 1655 1656
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1657

1658
  pReq->withTbName = tmq->withTbName;
L
Liu Jicong 已提交
1659
  pReq->consumerId = tmq->consumerId;
1660
  pReq->timeout = timeout;
X
Xiaoyu Wang 已提交
1661
  pReq->epoch = tmq->epoch;
H
Haojun Liao 已提交
1662
  pReq->reqOffset = pVg->offsetInfo.currentOffset;
D
dapan1121 已提交
1663
  pReq->head.vgId = pVg->vgId;
1664 1665
  pReq->useSnapshot = tmq->useSnapshot;
  pReq->reqId = generateRequestId();
1666 1667
}

L
Liu Jicong 已提交
1668 1669
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1670
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1671 1672 1673 1674 1675 1676 1677 1678
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

1679
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1680 1681
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
1682

1683
  (*numOfRows) = 0;
L
Liu Jicong 已提交
1684 1685
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
1686

L
Liu Jicong 已提交
1687
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1688
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1689
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1690

L
Liu Jicong 已提交
1691 1692
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1693

L
Liu Jicong 已提交
1694
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1695 1696
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1697

1698
  // extract the rows in this data packet
X
Xiaoyu Wang 已提交
1699
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
1700
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
X
Xiaoyu Wang 已提交
1701
    int64_t            rows = htobe64(pRetrieve->numOfRows);
1702
    pVg->numOfRows += rows;
1703
    (*numOfRows) += rows;
1704 1705
  }

L
Liu Jicong 已提交
1706
  return pRspObj;
X
Xiaoyu Wang 已提交
1707 1708
}

1709
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
L
Liu Jicong 已提交
1710
  SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
1711
  pRspObj->resType = RES_TYPE__TMQ_METADATA;
L
Liu Jicong 已提交
1712 1713 1714 1715
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  pRspObj->resIter = -1;
1716
  memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
L
Liu Jicong 已提交
1717 1718 1719

  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
1720
  if (!pWrapper->taosxRsp.withSchema) {
L
Liu Jicong 已提交
1721 1722 1723
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }

1724 1725 1726 1727 1728 1729 1730
  // extract the rows in this data packet
  for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i);
    int64_t            rows = htobe64(pRetrieve->numOfRows);
    pVg->numOfRows += rows;
    (*numOfRows) += rows;
  }
L
Liu Jicong 已提交
1731 1732 1733
  return pRspObj;
}

1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759 1760 1761 1762 1763 1764 1765 1766
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
  atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  tsem_post(&pTmq->rspSem);
  return -1;
}

static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
  SMqPollReq req = {0};
  tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);

  int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
  if (msgSize < 0) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  char* msg = taosMemoryCalloc(1, msgSize);
  if (NULL == msg) {
    return handleErrorBeforePoll(pVg, pTmq);
  }

  if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
  if (pParam == NULL) {
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
wmmhello's avatar
wmmhello 已提交
1767 1768 1769
//  pParam->pVg = pVg;  // pVg may be released,fix it
//  pParam->pTopic = pTopic;
  strcpy(pParam->topicName, pTopic->topicName);
1770
  pParam->vgId = pVg->vgId;
H
Haojun Liao 已提交
1771
  pParam->requestId = req.reqId;
1772 1773 1774 1775 1776 1777 1778 1779

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(msg);
    return handleErrorBeforePoll(pVg, pTmq);
  }

H
Haojun Liao 已提交
1780
  sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
1781 1782 1783 1784 1785 1786 1787
  sendInfo->requestId = req.reqId;
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqPollCb;
  sendInfo->msgType = TDMT_VND_TMQ_CONSUME;

  int64_t transporterId = 0;
1788
  char    offsetFormatBuf[TSDB_OFFSET_LEN];
H
Haojun Liao 已提交
1789
  tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.currentOffset);
1790

X
Xiaoyu Wang 已提交
1791 1792
  tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
           pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
1793 1794 1795
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

  pVg->pollCnt++;
1796
  pVg->seekUpdated = false;   // reset this flag.
1797 1798 1799 1800 1801
  pTmq->pollCnt++;

  return TSDB_CODE_SUCCESS;
}

1802
// broadcast the poll request to all related vnodes
H
Haojun Liao 已提交
1803
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
1804 1805 1806
  if(atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER){
    return 0;
  }
1807
  int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
X
Xiaoyu Wang 已提交
1808
  tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
1809 1810

  for (int i = 0; i < numOfTopics; i++) {
X
Xiaoyu Wang 已提交
1811
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
X
Xiaoyu Wang 已提交
1812
    int32_t         numOfVg = taosArrayGetSize(pTopic->vgs);
1813 1814

    for (int j = 0; j < numOfVg; j++) {
X
Xiaoyu Wang 已提交
1815
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
X
Xiaoyu Wang 已提交
1816
      if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) {  // less than 100ms
1817
        tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
X
Xiaoyu Wang 已提交
1818
                 tmq->epoch, pVg->vgId);
H
Haojun Liao 已提交
1819 1820 1821
        continue;
      }

1822
      int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
1823
      if (vgStatus == TMQ_VG_STATUS__WAIT) {
L
Liu Jicong 已提交
1824
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
1825
        tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
X
Xiaoyu Wang 已提交
1826
                 pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1827 1828
        continue;
      }
1829

L
Liu Jicong 已提交
1830
      atomic_store_32(&pVg->vgSkipCnt, 0);
1831 1832 1833
      int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
      if (code != TSDB_CODE_SUCCESS) {
        return code;
D
dapan1121 已提交
1834
      }
X
Xiaoyu Wang 已提交
1835 1836
    }
  }
1837

1838
  tscDebug("consumer:0x%" PRIx64 " end to poll data", tmq->consumerId);
X
Xiaoyu Wang 已提交
1839 1840 1841
  return 0;
}

H
Haojun Liao 已提交
1842
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
L
Liu Jicong 已提交
1843
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1844
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1845 1846
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1847
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
1848
      doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1849
      /*tmqClearUnhandleMsg(tmq);*/
L
Liu Jicong 已提交
1850
      tDeleteSMqAskEpRsp(rspMsg);
X
Xiaoyu Wang 已提交
1851 1852
      *pReset = true;
    } else {
L
Liu Jicong 已提交
1853
      tmqFreeRspWrapper(rspWrapper);
X
Xiaoyu Wang 已提交
1854 1855 1856 1857 1858 1859 1860 1861
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

H
Haojun Liao 已提交
1862
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
H
Haojun Liao 已提交
1863
  tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
1864

X
Xiaoyu Wang 已提交
1865
  while (1) {
1866 1867
    SMqRspWrapper* pRspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&pRspWrapper);
1868

1869
    if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1870
      taosReadAllQitems(tmq->mqueue, tmq->qall);
1871 1872
      taosGetQitem(tmq->qall, (void**)&pRspWrapper);
      if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1873 1874
        return NULL;
      }
X
Xiaoyu Wang 已提交
1875 1876
    }

X
Xiaoyu Wang 已提交
1877
    tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
H
Haojun Liao 已提交
1878

1879 1880
    if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(pRspWrapper);
L
Liu Jicong 已提交
1881
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
H
Haojun Liao 已提交
1882
      tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
L
Liu Jicong 已提交
1883
      return NULL;
1884
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
1885
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
H
Haojun Liao 已提交
1886

X
Xiaoyu Wang 已提交
1887
      int32_t     consumerEpoch = atomic_load_32(&tmq->epoch);
1888 1889 1890
      SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;

      if (pDataRsp->head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1891 1892 1893 1894 1895 1896 1897 1898
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
1899 1900 1901 1902
        // update the epset
        if (pollRspWrapper->pEpset != NULL) {
          SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset);
          SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet));
X
Xiaoyu Wang 已提交
1903 1904
          tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId,
                   pEp->fqdn, pEp->port, pOld->fqdn, pOld->port);
1905 1906 1907
          pVg->epSet = *pollRspWrapper->pEpset;
        }

1908 1909 1910
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
T
t_max 已提交
1911
          tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
1912
          pVg->offsetInfo.currentOffset = pDataRsp->rspOffset;
T
t_max 已提交
1913 1914
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
wmmhello's avatar
wmmhello 已提交
1915
        }
1916 1917

        // update the status
X
Xiaoyu Wang 已提交
1918
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
1919

1920 1921 1922
        // update the valid wal version range
        pVg->offsetInfo.walVerBegin = pDataRsp->head.walsver;
        pVg->offsetInfo.walVerEnd = pDataRsp->head.walever;
1923
        pVg->receivedInfoFromVnode = true;
1924

1925 1926
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset);
1927
        if (pDataRsp->blockNum == 0) {
X
Xiaoyu Wang 已提交
1928
          tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
wmmhello's avatar
wmmhello 已提交
1929
                   ", total:%" PRId64 ", reqId:0x%" PRIx64,
X
Xiaoyu Wang 已提交
1930
                   tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
1931
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
1932
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
L
Liu Jicong 已提交
1933
          taosFreeQitem(pollRspWrapper);
1934
        } else {  // build rsp
X
Xiaoyu Wang 已提交
1935
          int64_t    numOfRows = 0;
1936
          SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
H
Haojun Liao 已提交
1937
          tmq->totalRows += numOfRows;
1938
          pVg->emptyBlockReceiveTs = 0;
H
Haojun Liao 已提交
1939
          tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
wmmhello's avatar
wmmhello 已提交
1940
                   ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
1941
                   tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
H
Haojun Liao 已提交
1942
                   pollRspWrapper->reqId);
1943 1944 1945
          taosFreeQitem(pollRspWrapper);
          return pRsp;
        }
X
Xiaoyu Wang 已提交
1946
      } else {
H
Haojun Liao 已提交
1947
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1948
                 tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
1949
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1950 1951
        taosFreeQitem(pollRspWrapper);
      }
1952
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
1953
      // todo handle the wal range and epset for each vgroup
1954
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
L
Liu Jicong 已提交
1955
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
1956 1957 1958

      tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId);

L
Liu Jicong 已提交
1959
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1960 1961 1962 1963 1964 1965 1966 1967
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1968

1969
        if(pollRspWrapper->metaRsp.rspOffset.type != 0){    // if offset is validate
H
Haojun Liao 已提交
1970
          pVg->offsetInfo.currentOffset = pollRspWrapper->metaRsp.rspOffset;
1971
        }
1972

L
Liu Jicong 已提交
1973 1974
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1975
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1976 1977 1978
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
H
Haojun Liao 已提交
1979
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
1980
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
1981
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
1982
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1983
      }
1984
    } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
1985
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
X
Xiaoyu Wang 已提交
1986
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
H
Haojun Liao 已提交
1987

L
Liu Jicong 已提交
1988
      if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) {
wmmhello's avatar
wmmhello 已提交
1989 1990 1991 1992 1993 1994 1995 1996
        SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
        pollRspWrapper->vgHandle = pVg;
        pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
        if(pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL){
          tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
                   pollRspWrapper->topicName, pollRspWrapper->vgId);
          return NULL;
        }
H
Haojun Liao 已提交
1997

T
t_max 已提交
1998 1999 2000 2001 2002 2003 2004 2005 2006
        // update the local offset value only for the returned values, only when the local offset is NOT updated
        // by tmq_offset_seek function
        if (!pVg->seekUpdated) {
          if(pollRspWrapper->taosxRsp.rspOffset.type != 0) {    // if offset is validate
            tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", tmq->consumerId);
            pVg->offsetInfo.currentOffset = pollRspWrapper->taosxRsp.rspOffset;
          }
        } else {
          tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", tmq->consumerId);
2007
        }
2008

L
Liu Jicong 已提交
2009
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
H
Haojun Liao 已提交
2010

L
Liu Jicong 已提交
2011
        if (pollRspWrapper->taosxRsp.blockNum == 0) {
wmmhello's avatar
wmmhello 已提交
2012
          tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
2013
                   tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2014
          pVg->emptyBlockReceiveTs = taosGetTimestampMs();
2015
          pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
H
Haojun Liao 已提交
2016
          taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2017
          continue;
H
Haojun Liao 已提交
2018
        } else {
X
Xiaoyu Wang 已提交
2019
          pVg->emptyBlockReceiveTs = 0;  // reset the ts
L
Liu Jicong 已提交
2020
        }
wmmhello's avatar
wmmhello 已提交
2021

L
Liu Jicong 已提交
2022
        // build rsp
X
Xiaoyu Wang 已提交
2023
        void*   pRsp = NULL;
2024
        int64_t numOfRows = 0;
L
Liu Jicong 已提交
2025
        if (pollRspWrapper->taosxRsp.createTableNum == 0) {
2026
          tscError("consumer:0x%" PRIx64" createTableNum should > 0 if rsp type is data_meta", tmq->consumerId);
L
Liu Jicong 已提交
2027
        } else {
2028
          pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
wmmhello's avatar
wmmhello 已提交
2029
        }
H
Haojun Liao 已提交
2030

2031 2032
        tmq->totalRows += numOfRows;

2033 2034
        char buf[TSDB_OFFSET_LEN];
        tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.currentOffset);
H
Haojun Liao 已提交
2035
        tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
wmmhello's avatar
wmmhello 已提交
2036
                 ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
H
Haojun Liao 已提交
2037
                 tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
H
Haojun Liao 已提交
2038
                 tmq->totalRows, pollRspWrapper->reqId);
H
Haojun Liao 已提交
2039 2040

        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
2041
        return pRsp;
H
Haojun Liao 已提交
2042

L
Liu Jicong 已提交
2043
      } else {
H
Haojun Liao 已提交
2044
        tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
2045
                 tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
2046
        pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
L
Liu Jicong 已提交
2047 2048
        taosFreeQitem(pollRspWrapper);
      }
X
Xiaoyu Wang 已提交
2049
    } else {
H
Haojun Liao 已提交
2050 2051
      tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);

X
Xiaoyu Wang 已提交
2052
      bool reset = false;
2053 2054
      tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
      taosFreeQitem(pRspWrapper);
X
Xiaoyu Wang 已提交
2055
      if (pollIfReset && reset) {
2056
        tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
2057
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
2058 2059 2060 2061 2062
      }
    }
  }
}

2063
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
2064 2065
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
2066

2067
  tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
X
Xiaoyu Wang 已提交
2068
           timeout);
L
Liu Jicong 已提交
2069

2070
  // in no topic status, delayed task also need to be processed
L
Liu Jicong 已提交
2071
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
2072
    tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
2073
    taosMsleep(500);  //     sleep for a while
2074 2075 2076
    return NULL;
  }

wmmhello's avatar
wmmhello 已提交
2077
  while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
L
Liu Jicong 已提交
2078
    int32_t retryCnt = 0;
2079
    while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
H
Haojun Liao 已提交
2080
      if (retryCnt++ > 40) {
L
Liu Jicong 已提交
2081 2082
        return NULL;
      }
2083

2084
      tscInfo("consumer:0x%" PRIx64 " not ready, retry:%d/40 in 500ms", tmq->consumerId, retryCnt);
L
Liu Jicong 已提交
2085 2086 2087 2088
      taosMsleep(500);
    }
  }

X
Xiaoyu Wang 已提交
2089
  while (1) {
L
Liu Jicong 已提交
2090
    tmqHandleAllDelayedTask(tmq);
2091

L
Liu Jicong 已提交
2092
    if (tmqPollImpl(tmq, timeout) < 0) {
2093
      tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
L
Liu Jicong 已提交
2094
    }
L
Liu Jicong 已提交
2095

2096
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
2097
    if (rspObj) {
2098
      tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
L
Liu Jicong 已提交
2099
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
2100
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
2101
      tscInfo("consumer:0x%" PRIx64 " return null since no committed offset", tmq->consumerId);
L
Liu Jicong 已提交
2102
      return NULL;
X
Xiaoyu Wang 已提交
2103
    }
2104

2105
    if (timeout >= 0) {
L
Liu Jicong 已提交
2106
      int64_t currentTime = taosGetTimestampMs();
2107 2108
      int64_t elapsedTime = currentTime - startTime;
      if (elapsedTime > timeout) {
2109
        tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
L
Liu Jicong 已提交
2110
                 tmq->consumerId, tmq->epoch, startTime, currentTime);
X
Xiaoyu Wang 已提交
2111 2112
        return NULL;
      }
2113
      tsem_timewait(&tmq->rspSem, (timeout - elapsedTime));
L
Liu Jicong 已提交
2114 2115
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
L
Liu Jicong 已提交
2116
      tsem_timewait(&tmq->rspSem, 1000);
X
Xiaoyu Wang 已提交
2117 2118 2119 2120
    }
  }
}

2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134
static void displayConsumeStatistics(const tmq_t* pTmq) {
  int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics);
  tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d",
           pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch);

  tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId);
  for (int32_t i = 0; i < numOfTopics; ++i) {
    SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i);

    tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i);
    int32_t numOfVgs = taosArrayGetSize(pTopics->vgs);
    for (int32_t j = 0; j < numOfVgs; ++j) {
      SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j);
      tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows);
2135
    }
2136
  }
2137

2138 2139
  tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId);
}
2140

2141
int32_t tmq_consumer_close(tmq_t* tmq) {
2142
  tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status);
2143
  displayConsumeStatistics(tmq);
2144

2145 2146 2147 2148 2149 2150
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    // if auto commit is set, commit before close consumer. Otherwise, do nothing.
    if (tmq->autoCommit) {
      int32_t rsp = tmq_commit_sync(tmq, NULL);
      if (rsp != 0) {
        return rsp;
2151 2152
      }
    }
2153
    taosSsleep(2);  // sleep 2s for hb to send offset and rows to server
2154

L
Liu Jicong 已提交
2155
    int32_t     retryCnt = 0;
2156
    tmq_list_t* lst = tmq_list_new();
L
Liu Jicong 已提交
2157
    while (1) {
2158
      int32_t rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
2159 2160 2161 2162 2163 2164 2165 2166
      if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
        break;
      } else {
        retryCnt++;
        taosMsleep(500);
      }
    }

2167
    tmq_list_destroy(lst);
2168
  } else {
2169
    tscInfo("consumer:0x%" PRIx64 " not in ready state, close it directly", tmq->consumerId);
L
Liu Jicong 已提交
2170
  }
H
Haojun Liao 已提交
2171

2172
  taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
L
Liu Jicong 已提交
2173
  return 0;
2174
}
L
Liu Jicong 已提交
2175

L
Liu Jicong 已提交
2176 2177
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
2178
    return "success";
L
Liu Jicong 已提交
2179
  } else if (err == -1) {
L
Liu Jicong 已提交
2180 2181 2182
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
2183 2184
  }
}
L
Liu Jicong 已提交
2185

L
Liu Jicong 已提交
2186 2187 2188 2189 2190
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
2191 2192
  } else if (TD_RES_TMQ_METADATA(res)) {
    return TMQ_RES_METADATA;
L
Liu Jicong 已提交
2193 2194 2195 2196 2197
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
2198
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
2199 2200
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
2201
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2202 2203 2204
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
2205 2206 2207
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
2208 2209 2210 2211 2212
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2213 2214 2215 2216
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2217 2218 2219
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
2220 2221 2222
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
2223 2224 2225 2226 2227
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
2228 2229 2230 2231
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2232 2233 2234
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
2235
  } else if (TD_RES_TMQ_METADATA(res)) {
2236 2237
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
2238 2239 2240 2241
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
2242

2243 2244 2245 2246 2247 2248 2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265
int64_t tmq_get_vgroup_offset(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*) res;
    STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset;
    if (pOffset->type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res;
    if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->metaRsp.rspOffset.version;
    }
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res;
    if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) {
      return pRspObj->rsp.rspOffset.version;
    }
  }

  // data from tsdb, no valid offset info
  return -1;
}

L
Liu Jicong 已提交
2266 2267 2268 2269 2270 2271 2272
const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
2273
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
2274 2275
  } else if (TD_RES_TMQ_METADATA(res)) {
    SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res;
L
Liu Jicong 已提交
2276 2277 2278
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
2279
    }
L
Liu Jicong 已提交
2280 2281
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
  }
L
Liu Jicong 已提交
2282 2283
  return NULL;
}
2284

2285 2286 2287 2288
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
  if (pRes == NULL) {  // here needs to commit all offsets.
    asyncCommitAllOffsets(tmq, cb, param);
  } else {  // only commit one offset
2289
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, cb, param);
2290
  }
L
Liu Jicong 已提交
2291 2292
}

2293
static void commitCallBackFn(tmq_t *UNUSED_PARAM(tmq), int32_t code, void* param) {
2294 2295 2296
  SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
  pInfo->code = code;
  tsem_post(&pInfo->sem);
2297
}
2298

2299 2300 2301 2302 2303 2304 2305 2306 2307
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
  int32_t code = 0;

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  if (pRes == NULL) {
    asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
H
Haojun Liao 已提交
2308
  } else {
2309
    asyncCommitOffset(tmq, pRes, TDMT_VND_TMQ_COMMIT_OFFSET, commitCallBackFn, pInfo);
2310 2311
  }

2312 2313
  tsem_wait(&pInfo->sem);
  code = pInfo->code;
H
Haojun Liao 已提交
2314 2315

  tsem_destroy(&pInfo->sem);
2316 2317
  taosMemoryFree(pInfo);

X
Xiaoyu Wang 已提交
2318
  tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
2319 2320 2321 2322 2323 2324 2325 2326 2327 2328 2329 2330
  return code;
}

void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  SAskEpInfo* pInfo = param;
  pInfo->code = code;

  if (code == TSDB_CODE_SUCCESS) {
    SMqRspHead* head = pDataBuf->pData;

    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
2331
    doUpdateLocalEp(pTmq, head->epoch, &rsp);
2332 2333 2334
    tDeleteSMqAskEpRsp(&rsp);
  }

H
Haojun Liao 已提交
2335
  tsem_post(&pInfo->sem);
2336 2337 2338 2339 2340 2341 2342 2343 2344 2345 2346 2347 2348 2349 2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
}

void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return;
  }

  SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
  if (pWrapper == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return;
  }

  SMqRspHead* head = pDataBuf->pData;

  pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
  pWrapper->epoch = head->epoch;
  memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
  tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);

  taosWriteQitem(pTmq->mqueue, pWrapper);
}

int32_t doAskEp(tmq_t* pTmq) {
  SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
H
Haojun Liao 已提交
2362
  tsem_init(&pInfo->sem, 0, 0);
2363 2364

  asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
H
Haojun Liao 已提交
2365
  tsem_wait(&pInfo->sem);
2366 2367

  int32_t code = pInfo->code;
H
Haojun Liao 已提交
2368
  tsem_destroy(&pInfo->sem);
2369 2370 2371 2372 2373
  taosMemoryFree(pInfo);
  return code;
}

void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
2374
  SMqAskEpReq req = {0};
2375 2376 2377
  req.consumerId = pTmq->consumerId;
  req.epoch = pTmq->epoch;
  strcpy(req.cgroup, pTmq->groupId);
2378 2379 2380

  int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
  if (tlen < 0) {
2381 2382 2383
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2384 2385 2386 2387
  }

  void* pReq = taosMemoryCalloc(1, tlen);
  if (pReq == NULL) {
2388 2389 2390
    tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2391 2392 2393
  }

  if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
2394
    tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
2395
    taosMemoryFree(pReq);
2396 2397 2398

    askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
    return;
2399 2400 2401 2402
  }

  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
  if (pParam == NULL) {
2403
    tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
2404
    taosMemoryFree(pReq);
2405 2406 2407

    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2408 2409
  }

2410 2411 2412 2413
  pParam->refId = pTmq->refId;
  pParam->epoch = pTmq->epoch;
  pParam->pUserFn = askEpFn;
  pParam->pParam = param;
2414 2415 2416 2417 2418

  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (sendInfo == NULL) {
    taosMemoryFree(pParam);
    taosMemoryFree(pReq);
2419 2420
    askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
    return;
2421 2422
  }

X
Xiaoyu Wang 已提交
2423
  sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
2424 2425 2426 2427

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
2428
  sendInfo->fp = askEpCallbackFn;
2429 2430
  sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;

2431
  SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
2432
  tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
2433 2434

  int64_t transporterId = 0;
2435
  asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
2436 2437 2438 2439 2440 2441 2442
}

int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
2443 2444 2445
  int64_t refId = pParamSet->refId;

  tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
2446 2447 2448 2449 2450 2451 2452
  if (tmq == NULL) {
    taosMemoryFree(pParamSet);
    terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
    return -1;
  }

  // if no more waiting rsp
2453
  pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
2454
  taosMemoryFree(pParamSet);
wmmhello's avatar
wmmhello 已提交
2455
//  tmq->needReportOffsetRows = true;
2456 2457

  taosReleaseRef(tmqMgmt.rsetId, refId);
2458
  return 0;
2459 2460
}

2461
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
2462 2463
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
  if (waitingRspNum == 0) {
H
Haojun Liao 已提交
2464 2465
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
             vgId);
2466
    tmqCommitDone(pParamSet);
H
Haojun Liao 已提交
2467 2468 2469
  } else {
    tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId,
             waitingRspNum);
2470 2471
  }
}
2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491 2492 2493

SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
  SMqRspObj* pRspObj = (SMqRspObj*)res;
  pRspObj->resIter++;

  if (pRspObj->resIter < pRspObj->rsp.blockNum) {
    SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
    if (pRspObj->rsp.withSchema) {
      SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
      setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
      taosMemoryFreeClear(pRspObj->resInfo.row);
      taosMemoryFreeClear(pRspObj->resInfo.pCol);
      taosMemoryFreeClear(pRspObj->resInfo.length);
      taosMemoryFreeClear(pRspObj->resInfo.convertBuf);
      taosMemoryFreeClear(pRspObj->resInfo.convertJson);
    }

    setQueryResultFromRsp(&pRspObj->resInfo, pRetrieve, convertUcs4, false);
    return &pRspObj->resInfo;
  }

  return NULL;
H
Haojun Liao 已提交
2494 2495
}

2496 2497 2498 2499 2500 2501 2502 2503 2504 2505 2506 2507 2508 2509 2510 2511 2512 2513 2514 2515 2516
static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
  SMqVgWalInfoParam* pParam = param;
  SMqVgCommon* pCommon = pParam->pCommon;

  int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1);
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId,
             pParam->vgId, pCommon->pTopicName);
    pCommon->code = code;
  } else {
    SMqDataRsp rsp;
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeMqDataRsp(&decoder, &rsp);
    tDecoderClear(&decoder);

    SMqRspHead* pHead = pMsg->pData;

    tmq_topic_assignment assignment = {.begin = pHead->walsver,
                                       .end = pHead->walever,
                                       .currentOffset = rsp.rspOffset.version,
2517
                                       .vgId = pParam->vgId};
2518 2519 2520 2521 2522 2523 2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539

    taosThreadMutexLock(&pCommon->mutex);
    taosArrayPush(pCommon->pList, &assignment);
    taosThreadMutexUnlock(&pCommon->mutex);
  }

  if (total == pParam->totalReq) {
    tsem_post(&pCommon->rsp);
  }

  taosMemoryFree(pParam);
  return 0;
}

static void destroyCommonInfo(SMqVgCommon* pCommon) {
  taosArrayDestroy(pCommon->pList);
  tsem_destroy(&pCommon->rsp);
  taosThreadMutexDestroy(&pCommon->mutex);
  taosMemoryFree(pCommon->pTopicName);
  taosMemoryFree(pCommon);
}

H
Haojun Liao 已提交
2540
int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment,
H
Haojun Liao 已提交
2541
                                 int32_t* numOfAssignment) {
H
Haojun Liao 已提交
2542 2543 2544
  *numOfAssignment = 0;
  *assignment = NULL;

2545
  int32_t accId = tmq->pTscObj->acctId;
2546
  char    tname[128] = {0};
2547 2548 2549
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2550 2551 2552 2553 2554 2555
  if (pTopic == NULL) {
    return TSDB_CODE_INVALID_PARA;
  }

  // in case of snapshot is opened, no valid offset will return
  *numOfAssignment = taosArrayGetSize(pTopic->vgs);
2556 2557 2558 2559 2560 2561 2562 2563

  *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment));
  if (*assignment == NULL) {
    tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId,
             (*numOfAssignment) * sizeof(tmq_topic_assignment));
    return TSDB_CODE_OUT_OF_MEMORY;
  }

2564 2565
  bool needFetch = false;

H
Haojun Liao 已提交
2566 2567
  for (int32_t j = 0; j < (*numOfAssignment); ++j) {
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
2568
    if (!pClientVg->receivedInfoFromVnode) {
2569 2570 2571
      needFetch = true;
      break;
    }
H
Haojun Liao 已提交
2572 2573 2574 2575 2576 2577 2578 2579 2580 2581

    tmq_topic_assignment* pAssignment = &(*assignment)[j];
    if (pClientVg->offsetInfo.currentOffset.type == TMQ_OFFSET__LOG) {
      pAssignment->currentOffset = pClientVg->offsetInfo.currentOffset.version;
    } else {
      pAssignment->currentOffset = 0;
    }

    pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
    pAssignment->end = pClientVg->offsetInfo.walVerEnd;
2582
    pAssignment->vgId = pClientVg->vgId;
H
Haojun Liao 已提交
2583 2584
  }

2585 2586 2587 2588 2589 2590 2591 2592 2593 2594 2595 2596 2597 2598 2599 2600 2601 2602 2603 2604 2605 2606 2607 2608 2609 2610 2611 2612 2613 2614 2615 2616 2617 2618 2619 2620 2621 2622 2623 2624 2625 2626 2627 2628 2629 2630 2631 2632 2633 2634 2635 2636 2637 2638 2639 2640 2641 2642 2643 2644 2645 2646 2647 2648 2649 2650 2651 2652
  if (needFetch) {
    SMqVgCommon* pCommon = taosMemoryCalloc(1, sizeof(SMqVgCommon));
    if (pCommon == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return terrno;
    }

    pCommon->pList= taosArrayInit(4, sizeof(tmq_topic_assignment));
    tsem_init(&pCommon->rsp, 0, 0);
    taosThreadMutexInit(&pCommon->mutex, 0);
    pCommon->pTopicName = taosStrdup(pTopic->topicName);
    pCommon->consumerId = tmq->consumerId;

    terrno = TSDB_CODE_OUT_OF_MEMORY;
    for (int32_t i = 0; i < (*numOfAssignment); ++i) {
      SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);

      SMqVgWalInfoParam* pParam = taosMemoryMalloc(sizeof(SMqVgWalInfoParam));
      if (pParam == NULL) {
        destroyCommonInfo(pCommon);
        return terrno;
      }

      pParam->epoch = tmq->epoch;
      pParam->vgId = pClientVg->vgId;
      pParam->totalReq = *numOfAssignment;
      pParam->pCommon = pCommon;

      SMqPollReq req = {0};
      tmqBuildConsumeReqImpl(&req, tmq, 10, pTopic, pClientVg);

      int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
      if (msgSize < 0) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      char* msg = taosMemoryCalloc(1, msgSize);
      if (NULL == msg) {
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
        taosMemoryFree(msg);
        taosMemoryFree(pParam);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (sendInfo == NULL) {
        taosMemoryFree(pParam);
        taosMemoryFree(msg);
        destroyCommonInfo(pCommon);
        return terrno;
      }

      sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
      sendInfo->requestId = req.reqId;
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqGetWalInfoCb;
      sendInfo->msgType = TDMT_VND_TMQ_VG_WALINFO;

      int64_t transporterId = 0;
2653
      char    offsetFormatBuf[TSDB_OFFSET_LEN];
2654 2655
      tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.currentOffset);

2656
      tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
2657 2658 2659 2660 2661 2662 2663 2664 2665 2666
               tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
    }

    tsem_wait(&pCommon->rsp);
    int32_t code = pCommon->code;

    terrno = code;
    if (code != TSDB_CODE_SUCCESS) {
      taosMemoryFree(*assignment);
2667
      *assignment = NULL;
2668 2669 2670 2671 2672 2673 2674 2675 2676
      *numOfAssignment = 0;
    } else {
      int32_t num = taosArrayGetSize(pCommon->pList);
      for(int32_t i = 0; i < num; ++i) {
        (*assignment)[i] = *(tmq_topic_assignment*)taosArrayGet(pCommon->pList, i);
      }
      *numOfAssignment = num;
    }

2677 2678 2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689
    for (int32_t j = 0; j < (*numOfAssignment); ++j) {
      tmq_topic_assignment* p = &(*assignment)[j];

      for(int32_t i = 0; i < taosArrayGetSize(pTopic->vgs); ++i) {
        SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
        if (pClientVg->vgId != p->vgId) {
          continue;
        }

        SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo;

        pOffsetInfo->currentOffset.type = TMQ_OFFSET__LOG;

2690
        char offsetBuf[TSDB_OFFSET_LEN] = {0};
2691 2692
        tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);

2693
        tscInfo("vgId:%d offset is update to:%s", p->vgId, offsetBuf);
2694 2695 2696 2697 2698 2699 2700 2701

        pOffsetInfo->walVerBegin = p->begin;
        pOffsetInfo->walVerEnd = p->end;
        pOffsetInfo->currentOffset.version = p->currentOffset;
        pOffsetInfo->committedOffset.version = p->currentOffset;
      }
    }

2702 2703 2704 2705 2706
    destroyCommonInfo(pCommon);
    return code;
  } else {
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
2707 2708
}

T
t_max 已提交
2709 2710 2711 2712 2713 2714 2715 2716
void tmq_free_assignment(tmq_topic_assignment* pAssignment) {
    if (pAssignment == NULL) {
        return;
    }

    taosMemoryFree(pAssignment);
}

2717
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
H
Haojun Liao 已提交
2718
  if (tmq == NULL) {
H
Haojun Liao 已提交
2719
    tscError("invalid tmq handle, null");
H
Haojun Liao 已提交
2720 2721 2722
    return TSDB_CODE_INVALID_PARA;
  }

2723 2724 2725 2726 2727
  int32_t accId = tmq->pTscObj->acctId;
  char tname[128] = {0};
  sprintf(tname, "%d.%s", accId, pTopicName);

  SMqClientTopic* pTopic = getTopicByName(tmq, tname);
H
Haojun Liao 已提交
2728
  if (pTopic == NULL) {
2729
    tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName);
H
Haojun Liao 已提交
2730 2731 2732 2733
    return TSDB_CODE_INVALID_PARA;
  }

  SMqClientVg* pVg = NULL;
H
Haojun Liao 已提交
2734 2735
  int32_t      numOfVgs = taosArrayGetSize(pTopic->vgs);
  for (int32_t i = 0; i < numOfVgs; ++i) {
H
Haojun Liao 已提交
2736
    SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
2737
    if (pClientVg->vgId == vgId) {
H
Haojun Liao 已提交
2738 2739 2740 2741 2742 2743
      pVg = pClientVg;
      break;
    }
  }

  if (pVg == NULL) {
2744
    tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
H
Haojun Liao 已提交
2745 2746 2747 2748 2749 2750
    return TSDB_CODE_INVALID_PARA;
  }

  SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo;

  int32_t type = pOffsetInfo->currentOffset.type;
2751
  if (type != TMQ_OFFSET__LOG && !OFFSET_IS_RESET_OFFSET(type)) {
2752
    tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type);
H
Haojun Liao 已提交
2753 2754 2755
    return TSDB_CODE_INVALID_PARA;
  }

2756
  if (type == TMQ_OFFSET__LOG && (offset < pOffsetInfo->walVerBegin || offset > pOffsetInfo->walVerEnd)) {
H
Haojun Liao 已提交
2757 2758
    tscError("consumer:0x%" PRIx64 " invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]",
             tmq->consumerId, offset, pOffsetInfo->walVerBegin, pOffsetInfo->walVerEnd);
H
Haojun Liao 已提交
2759 2760 2761
    return TSDB_CODE_INVALID_PARA;
  }

H
Haojun Liao 已提交
2762 2763 2764
  // update the offset, and then commit to vnode
  if (pOffsetInfo->currentOffset.type == TMQ_OFFSET__LOG) {
    pOffsetInfo->currentOffset.version = offset;
2765
    pOffsetInfo->committedOffset.version = INT64_MIN;
2766
    pVg->seekUpdated = true;
H
Haojun Liao 已提交
2767 2768
  }

H
Haojun Liao 已提交
2769
  SMqRspObj rspObj = {.resType = RES_TYPE__TMQ, .vgId = pVg->vgId};
2770
  tstrncpy(rspObj.topic, tname, tListLen(rspObj.topic));
H
Haojun Liao 已提交
2771

2772
  tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, pVg->vgId);
2773 2774 2775 2776 2777 2778 2779 2780 2781 2782 2783 2784 2785 2786 2787 2788 2789 2790

  SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
  if (pInfo == NULL) {
    tscError("consumer:0x%"PRIx64" failed to prepare seek operation", tmq->consumerId);
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  tsem_init(&pInfo->sem, 0, 0);
  pInfo->code = 0;

  asyncCommitOffset(tmq, &rspObj, TDMT_VND_TMQ_SEEK_TO_OFFSET, commitCallBackFn, pInfo);

  tsem_wait(&pInfo->sem);
  int32_t code = pInfo->code;

  tsem_destroy(&pInfo->sem);
  taosMemoryFree(pInfo);

H
Haojun Liao 已提交
2791 2792 2793 2794 2795 2796
  if (code != TSDB_CODE_SUCCESS) {
    tscError("consumer:0x%" PRIx64 " failed to send seek to vgId:%d, code:%s", tmq->consumerId, pVg->vgId,
             tstrerror(code));
  }

  return code;
2797
}