You need to sign in or sign up before continuing.
tmq.c 54.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_conf_t {
52 53 54 55 56
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
L
Liu Jicong 已提交
57 58
  int8_t   spEnable;
  int32_t  spBatchSize;
59 60 61 62 63 64
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
65
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
66
  void*          commitCbUserParam;
L
Liu Jicong 已提交
67 68 69
};

struct tmq_t {
L
Liu Jicong 已提交
70
  // conf
L
Liu Jicong 已提交
71 72
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
73
  int8_t         withTbName;
L
Liu Jicong 已提交
74
  int8_t         useSnapshot;
L
Liu Jicong 已提交
75
  int8_t         autoCommit;
L
Liu Jicong 已提交
76
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
77
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
78
  int64_t        consumerId;
L
Liu Jicong 已提交
79 80
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
81 82 83 84

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
85 86
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
87
  int32_t epSkipCnt;
L
Liu Jicong 已提交
88
#endif
L
Liu Jicong 已提交
89 90
  int64_t pollCnt;

L
Liu Jicong 已提交
91 92 93 94 95
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
96 97 98 99
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
100
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
101
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
102
  STaosQall*  qall;
L
Liu Jicong 已提交
103 104 105 106
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
107 108
};

X
Xiaoyu Wang 已提交
109 110 111 112 113 114 115 116
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
117
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
118 119
};

L
Liu Jicong 已提交
120 121 122 123 124 125
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
126
typedef struct {
127 128 129
  // statistics
  int64_t pollCnt;
  // offset
L
Liu Jicong 已提交
130 131 132 133
  /*int64_t      committedOffset;*/
  /*int64_t      currentOffset;*/
  STqOffsetVal committedOffsetNew;
  STqOffsetVal currentOffsetNew;
L
Liu Jicong 已提交
134
  // connection info
135
  int32_t vgId;
X
Xiaoyu Wang 已提交
136
  int32_t vgStatus;
L
Liu Jicong 已提交
137
  int32_t vgSkipCnt;
138 139 140
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
141
typedef struct {
142
  // subscribe info
L
Liu Jicong 已提交
143
  char* topicName;
L
Liu Jicong 已提交
144
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
145 146 147

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
148 149
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
150 151
} SMqClientTopic;

L
Liu Jicong 已提交
152 153 154 155 156
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
157
  union {
L
Liu Jicong 已提交
158 159
    SMqDataRsp dataRsp;
    SMqMetaRsp metaRsp;
L
Liu Jicong 已提交
160
  };
L
Liu Jicong 已提交
161 162
} SMqPollRspWrapper;

L
Liu Jicong 已提交
163
typedef struct {
L
Liu Jicong 已提交
164 165 166
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
167
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
168

L
Liu Jicong 已提交
169
typedef struct {
170
  tmq_t*  tmq;
L
Liu Jicong 已提交
171
  int32_t code;
L
Liu Jicong 已提交
172
  int32_t async;
X
Xiaoyu Wang 已提交
173
  tsem_t  rspSem;
174 175
} SMqAskEpCbParam;

L
Liu Jicong 已提交
176
typedef struct {
L
Liu Jicong 已提交
177 178
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
179
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
180
  int32_t         epoch;
L
Liu Jicong 已提交
181
  int32_t         vgId;
L
Liu Jicong 已提交
182
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
183
} SMqPollCbParam;
184

L
Liu Jicong 已提交
185
#if 0
L
Liu Jicong 已提交
186
typedef struct {
L
Liu Jicong 已提交
187
  tmq_t*         tmq;
L
Liu Jicong 已提交
188 189
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
190
  int8_t         freeOffsets;
L
Liu Jicong 已提交
191
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
192
  tsem_t         rspSem;
L
Liu Jicong 已提交
193
  int32_t        rspErr;
L
Liu Jicong 已提交
194
  SArray*        offsets;
L
Liu Jicong 已提交
195
  void*          userParam;
L
Liu Jicong 已提交
196
} SMqCommitCbParam;
L
Liu Jicong 已提交
197
#endif
L
Liu Jicong 已提交
198

199
typedef struct {
L
Liu Jicong 已提交
200 201 202 203
  tmq_t* tmq;
  int8_t automatic;
  int8_t async;
  /*int8_t         freeOffsets;*/
L
Liu Jicong 已提交
204 205
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
206
  int32_t        rspErr;
207
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
208 209 210 211
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
212 213 214 215 216 217 218
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

219
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
220
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
221
  conf->withTbName = false;
L
Liu Jicong 已提交
222
  conf->autoCommit = true;
L
Liu Jicong 已提交
223
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
224
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
225 226 227
  return conf;
}

L
Liu Jicong 已提交
228
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
229 230 231 232 233 234
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
235 236 237
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
238 239
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
240
    return TMQ_CONF_OK;
241
  }
L
Liu Jicong 已提交
242

243 244
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
245 246
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
247

L
Liu Jicong 已提交
248 249
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
250
      conf->autoCommit = true;
L
Liu Jicong 已提交
251 252
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
253
      conf->autoCommit = false;
L
Liu Jicong 已提交
254 255 256 257
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
258
  }
L
Liu Jicong 已提交
259

L
Liu Jicong 已提交
260 261 262 263 264
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
279

280 281
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
282
      conf->withTbName = true;
L
Liu Jicong 已提交
283
      return TMQ_CONF_OK;
284
    } else if (strcmp(value, "false") == 0) {
285
      conf->withTbName = false;
L
Liu Jicong 已提交
286
      return TMQ_CONF_OK;
287 288 289 290 291
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
292
  if (strcmp(key, "experimental.snapshot.enable") == 0) {
L
Liu Jicong 已提交
293
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
294
      conf->spEnable = true;
L
Liu Jicong 已提交
295 296
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
297
      conf->spEnable = false;
L
Liu Jicong 已提交
298 299 300 301 302 303
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
304 305 306 307 308
  if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
    conf->spBatchSize = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
309
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
310 311 312
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
313
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
314 315 316
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
317
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
318 319 320
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
321
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
322 323 324
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
325
  if (strcmp(key, "td.connect.db") == 0) {
326
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
327 328 329
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
330
  return TMQ_CONF_UNKNOWN;
331 332 333
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
334 335
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
336 337
}

L
Liu Jicong 已提交
338 339 340
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
341
  if (taosArrayPush(container, &topic) == NULL) return -1;
342 343 344
  return 0;
}

L
Liu Jicong 已提交
345
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
346
  SArray* container = &list->container;
L
Liu Jicong 已提交
347
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
348 349
}

L
Liu Jicong 已提交
350 351 352 353 354 355 356 357 358 359
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
360 361 362 363
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
364
#if 0
L
Liu Jicong 已提交
365 366
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
367
  pParam->rspErr = code;
L
Liu Jicong 已提交
368 369
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
370
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
371
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
372
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
373 374
    }

L
Liu Jicong 已提交
375
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
376 377 378 379 380 381 382 383 384
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}
L
Liu Jicong 已提交
385
#endif
L
Liu Jicong 已提交
386

387 388 389 390
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
391
#if 0
392 393 394 395 396
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
397
#endif
L
Liu Jicong 已提交
398

L
Liu Jicong 已提交
399 400 401
  /*tscDebug("receive offset commit cb of %s on vg %d, offset is %ld", pParam->pOffset->subKey, pParam->->vgId,
   * pOffset->version);*/

402
  // count down waiting rsp
L
Liu Jicong 已提交
403
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
404 405 406 407 408 409 410
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
411
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
412 413
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
414
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
415
      }
L
Liu Jicong 已提交
416 417
    } else {
      tsem_post(&pParamSet->rspSem);
418 419
    }

L
Liu Jicong 已提交
420
#if 0
421 422
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
423
#endif
424 425 426 427
  }
  return 0;
}

L
Liu Jicong 已提交
428 429 430 431 432 433 434
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
  STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
  if (pOffset == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pOffset->val = pVg->currentOffsetNew;
435

L
Liu Jicong 已提交
436 437 438 439
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pOffset->subKey, tmq->groupId, groupLen);
  pOffset->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
L
Liu Jicong 已提交
440

L
Liu Jicong 已提交
441 442 443 444 445 446 447 448 449 450
  int32_t len;
  int32_t code;
  tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
  if (code < 0) {
    ASSERT(0);
    return -1;
  }
  void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
  if (buf == NULL) return -1;
  ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
451

L
Liu Jicong 已提交
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

  SEncoder encoder;
  tEncoderInit(&encoder, abuf, len);
  tEncodeSTqOffset(&encoder, pOffset);

  // build param
  SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
  pParam->params = pParamSet;
  pParam->pOffset = pOffset;

  // build send info
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    return -1;
  }
  pMsgSendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = sizeof(SMsgHead) + len,
      .handle = NULL,
  };

  tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId,
           pOffset->val.version);

  // TODO: put into cb
  pVg->committedOffsetNew = pVg->currentOffsetNew;

  pMsgSendInfo->requestId = generateRequestId();
  pMsgSendInfo->requestObjRefId = 0;
  pMsgSendInfo->param = pParam;
  pMsgSendInfo->fp = tmqCommitCb2;
  pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
  // send msg

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
  pParamSet->waitingRspNum++;
  pParamSet->totalRspNum++;
  return 0;
}

int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  char*   topic;
  int32_t vgId;
  ASSERT(msg != NULL);
  if (TD_RES_TMQ(msg)) {
    SMqRspObj* pRspObj = (SMqRspObj*)msg;
    topic = pRspObj->topic;
    vgId = pRspObj->vgId;
  } else if (TD_RES_TMQ_META(msg)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
    topic = pMetaRspObj->topic;
    vgId = pMetaRspObj->vgId;
  } else {
    return TSDB_CODE_TMQ_INVALID_MSG;
  }

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = 0;
  pParamSet->async = async;
  /*pParamSet->freeOffsets = 1;*/
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

L
Liu Jicong 已提交
523 524
  int32_t code = -1;

L
Liu Jicong 已提交
525 526 527 528 529 530 531 532 533 534
  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(pTopic->topicName, topic) != 0) continue;
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      if (pVg->vgId != vgId) continue;

      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          goto FAIL;
L
Liu Jicong 已提交
535
        }
L
Liu Jicong 已提交
536
        goto HANDLE_RSP;
L
Liu Jicong 已提交
537 538
      }
    }
L
Liu Jicong 已提交
539
  }
L
Liu Jicong 已提交
540

L
Liu Jicong 已提交
541 542 543 544
HANDLE_RSP:
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
L
Liu Jicong 已提交
545 546 547
    return 0;
  }

L
Liu Jicong 已提交
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
    return code;
  } else {
    code = 0;
  }

FAIL:
  if (code != 0 && async) {
    userCb(tmq, code, userParam);
  }
  return 0;
}

int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
  int32_t code = -1;

  if (msg != NULL) {
    return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
  }

572 573 574 575 576 577 578 579
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
L
Liu Jicong 已提交
580
  /*pParamSet->freeOffsets = 1;*/
581 582 583 584 585 586
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
587 588 589 590

    tscDebug("consumer %ld begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
             (int32_t)taosArrayGetSize(pTopic->vgs));

591
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
592 593 594 595
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

      tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId);

L
Liu Jicong 已提交
596 597 598 599
      if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) {
        if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
          continue;
        }
600 601 602 603
      }
    }
  }

L
Liu Jicong 已提交
604 605 606 607 608 609
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

610 611 612 613 614 615 616 617 618 619
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
620
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
621
    } else {
L
Liu Jicong 已提交
622
      userCb(tmq, code, userParam);
623 624 625
    }
  }

L
Liu Jicong 已提交
626
#if 0
627 628 629 630
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
631
#endif
632 633 634 635

  return 0;
}

L
Liu Jicong 已提交
636 637
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
638 639 640 641 642 643
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
644 645
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
646

L
Liu Jicong 已提交
647
  if (msg == NULL) {
L
Liu Jicong 已提交
648
    freeOffsets = 1;
L
Liu Jicong 已提交
649 650 651 652 653 654 655 656 657 658 659 660 661 662
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
663
    freeOffsets = 0;
L
Liu Jicong 已提交
664
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
665 666 667 668 669
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
670 671 672 673
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
674 675 676
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
677
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
678 679
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
680
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
681 682
    goto END;
  }
L
Liu Jicong 已提交
683 684
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
685 686 687 688 689 690 691 692 693 694 695 696
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
697
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
698 699 700 701
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

702
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
726 727
  } else {
    code = 0;
L
Liu Jicong 已提交
728 729 730 731 732 733 734
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
735 736
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
737 738 739

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
740
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
741
    } else {
L
Liu Jicong 已提交
742
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
743 744 745
    }
  }

L
Liu Jicong 已提交
746
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
747 748 749 750
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
751
#endif
L
Liu Jicong 已提交
752

L
Liu Jicong 已提交
753 754
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
755
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
756 757
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
758
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
759 760 761 762
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
763
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
764 765
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
766
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
767 768 769 770
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
771
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
772 773
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
774
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
775 776 777 778 779 780 781 782 783 784 785
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
786
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
787 788
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
789
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
790 791 792 793 794
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
795
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
796 797 798 799 800
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
801
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
802
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
803 804 805 806 807 808 809 810
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
811
  msg = NULL;
L
Liu Jicong 已提交
812 813 814 815 816 817 818 819 820 821
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
822 823 824
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
825
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
826 827 828
  tsem_post(&pParam->rspSem);
  return 0;
}
829

L
Liu Jicong 已提交
830
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
831 832 833 834
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
835
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
836
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
837
  }
L
Liu Jicong 已提交
838
  return 0;
X
Xiaoyu Wang 已提交
839 840
}

L
Liu Jicong 已提交
841 842 843
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
844 845
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
846 847
}

L
Liu Jicong 已提交
848
#if 0
849
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
850
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
851 852 853 854 855 856
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
857
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
858
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
859
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
860
  // set conf
861 862
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
863
  pTmq->autoCommit = conf->autoCommit;
864
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
865
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
866

L
Liu Jicong 已提交
867 868 869 870 871 872 873 874 875 876
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
877 878
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
879 880
  return pTmq;
}
L
Liu Jicong 已提交
881
#endif
L
Liu Jicong 已提交
882

L
Liu Jicong 已提交
883
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
884 885 886 887 888 889 890 891 892 893 894
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
895 896 897 898
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
899

L
Liu Jicong 已提交
900 901 902 903 904
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
905
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
906

L
Liu Jicong 已提交
907 908 909 910 911 912 913 914
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
915

L
Liu Jicong 已提交
916 917
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
918 919
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
920 921
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
922

L
Liu Jicong 已提交
923 924 925
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
926
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
927
  pTmq->useSnapshot = conf->spEnable;
L
Liu Jicong 已提交
928
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
929
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
930 931
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
932 933
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
934
  // assign consumerId
L
Liu Jicong 已提交
935
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
936

L
Liu Jicong 已提交
937 938 939 940
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
941

L
Liu Jicong 已提交
942 943 944 945 946 947
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
948

949
  return pTmq;
L
Liu Jicong 已提交
950 951 952 953 954 955 956 957

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
958 959
}

L
Liu Jicong 已提交
960 961
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
962
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
963
}
L
Liu Jicong 已提交
964
#endif
L
Liu Jicong 已提交
965

L
Liu Jicong 已提交
966
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
967 968 969 970 971
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
972 973

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
974
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
975
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
976
  if (req.topicNames == NULL) goto FAIL;
977

L
Liu Jicong 已提交
978 979
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
980 981

    SName name = {0};
L
Liu Jicong 已提交
982
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
983

L
Liu Jicong 已提交
984 985 986
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
987
    }
L
Liu Jicong 已提交
988
    tNameExtractFullName(&name, topicFName);
989

L
Liu Jicong 已提交
990 991 992
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
993 994
  }

L
Liu Jicong 已提交
995 996 997 998
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

999 1000 1001
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1002
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1003
  if (sendInfo == NULL) goto FAIL;
1004

X
Xiaoyu Wang 已提交
1005
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1006
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1007 1008
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1009

L
Liu Jicong 已提交
1010 1011 1012
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1013 1014 1015 1016
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1017

L
Liu Jicong 已提交
1018 1019
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1020 1021
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1022 1023
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1024 1025 1026 1027 1028
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1029 1030 1031
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1032 1033
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1034

L
Liu Jicong 已提交
1035 1036 1037
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1038
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1039
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1040 1041
    taosMsleep(500);
  }
1042

L
Liu Jicong 已提交
1043
  // init hb timer
1044 1045 1046
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
1047 1048

  // init auto commit timer
1049
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1050 1051 1052
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1053 1054 1055
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1056
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1057 1058 1059
    taosMemoryFree(buf);
  }
  return code;
1060 1061
}

L
Liu Jicong 已提交
1062
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1063
  //
1064
  conf->commitCb = cb;
L
Liu Jicong 已提交
1065
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1066
}
1067

L
Liu Jicong 已提交
1068
#if 0
L
Liu Jicong 已提交
1069 1070
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
1071
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1072 1073
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
1074
#endif
L
Liu Jicong 已提交
1075 1076

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1077 1078
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1079
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1080
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1081 1082 1083
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1084
  if (code != 0) {
L
Liu Jicong 已提交
1085 1086
    tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
        tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1100
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1101 1102
  }

X
Xiaoyu Wang 已提交
1103 1104 1105
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1106
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
1107
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1108
            tmqEpoch);
1109
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1110
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1111 1112 1113 1114
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
1115
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1116 1117
  }

L
Liu Jicong 已提交
1118 1119 1120
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;

1121
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1122
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1123 1124
    taosMemoryFree(pMsg->pData);
    tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1125
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1126
  }
L
Liu Jicong 已提交
1127

L
Liu Jicong 已提交
1128
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1129 1130
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1131

L
Liu Jicong 已提交
1132
  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1133 1134 1135
    SDecoder decoder;
    tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
    tDecodeSMqDataRsp(&decoder, &pRspWrapper->dataRsp);
L
Liu Jicong 已提交
1136
    memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1137
    /*tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);*/
L
Liu Jicong 已提交
1138 1139
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
L
Liu Jicong 已提交
1140
    memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1141 1142
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
  }
L
Liu Jicong 已提交
1143

L
Liu Jicong 已提交
1144
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1145

L
Liu Jicong 已提交
1146
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld, type %d", tmq->consumerId, pVg->vgId,
L
Liu Jicong 已提交
1147
           pRspWrapper->dataRsp.reqOffset.version, pRspWrapper->dataRsp.rspOffset.version, rspType);
L
fix  
Liu Jicong 已提交
1148

L
Liu Jicong 已提交
1149
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1150
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1151

L
Liu Jicong 已提交
1152
  return 0;
L
fix txn  
Liu Jicong 已提交
1153
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1154
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1155 1156
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1157
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1158
  return -1;
1159 1160
}

1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1189 1190 1191 1192 1193
        char buf[50];
        tFormatOffset(buf, 50, &pVgCur->currentOffsetNew);
        tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey,
                 buf);
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
L
Liu Jicong 已提交
1212 1213
      STqOffsetVal* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      STqOffsetVal  offsetNew = {.type = tmq->resetOffsetCfg};
1214
      if (pOffset != NULL) {
L
Liu Jicong 已提交
1215
        offsetNew = *pOffset;
1216 1217
      }

L
Liu Jicong 已提交
1218 1219
      /*tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,*/
      /*pVgEp->vgId, offset, vgKey);*/
1220 1221
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1222
          .currentOffsetNew = offsetNew,
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1246
#if 0
L
Liu Jicong 已提交
1247
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1248
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1249
  bool    set = false;
L
Liu Jicong 已提交
1250 1251
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
1252 1253
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1266 1267
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1268
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1269
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1270
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1271
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1272

L
Liu Jicong 已提交
1273
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1274 1275 1276 1277 1278 1279
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
1280
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1281 1282 1283 1284
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1285
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1286 1287 1288 1289 1290 1291 1292 1293 1294
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1295
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1296 1297 1298
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
1299
      tscDebug("consumer %ld(epoch %d) original offset of vg %d is %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1300 1301
      if (pOffset != NULL) {
        offset = *pOffset;
1302 1303
        tscDebug("consumer %ld(epoch %d) receive offset of vg %d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
                 vgKey);
L
Liu Jicong 已提交
1304
      }
1305
      tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1306 1307
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1308
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1309 1310 1311
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1312
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1313 1314 1315 1316
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1317
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1318
  }
L
Liu Jicong 已提交
1319
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1320
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1321
  tmq->clientTopics = newTopics;
1322

1323 1324 1325 1326
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1327

X
Xiaoyu Wang 已提交
1328 1329 1330
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1331
#endif
X
Xiaoyu Wang 已提交
1332

L
Liu Jicong 已提交
1333
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1334
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1335
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1336
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1337
  pParam->code = code;
1338
  if (code != 0) {
L
Liu Jicong 已提交
1339
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1340
    goto END;
1341
  }
L
Liu Jicong 已提交
1342

L
Liu Jicong 已提交
1343
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1344
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1345
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1346 1347
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1348
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1349 1350
  if (head->epoch <= epoch) {
    goto END;
1351
  }
L
Liu Jicong 已提交
1352

L
Liu Jicong 已提交
1353
  if (!async) {
L
Liu Jicong 已提交
1354 1355
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
1356 1357
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1358
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1359
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1360
  } else {
1361
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1362
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1363
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1364 1365
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1366
    }
L
Liu Jicong 已提交
1367 1368 1369
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1370
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1371

L
Liu Jicong 已提交
1372
    taosWriteQitem(tmq->mqueue, pWrapper);
1373
    tsem_post(&tmq->rspSem);
1374
  }
L
Liu Jicong 已提交
1375 1376

END:
L
Liu Jicong 已提交
1377
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1378
  if (!async) {
L
Liu Jicong 已提交
1379
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1380 1381
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1382 1383
  }
  return code;
1384 1385
}

L
Liu Jicong 已提交
1386
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1387
  int32_t code = 0;
L
Liu Jicong 已提交
1388
#if 0
L
Liu Jicong 已提交
1389
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1390
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1391
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1392
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1393
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1394
  }
L
temp  
Liu Jicong 已提交
1395
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1396
#endif
L
Liu Jicong 已提交
1397
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1398
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1399
  if (req == NULL) {
L
Liu Jicong 已提交
1400
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1401
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1402
    return -1;
L
Liu Jicong 已提交
1403
  }
L
Liu Jicong 已提交
1404 1405 1406
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1407

L
Liu Jicong 已提交
1408
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1409 1410
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1411
    taosMemoryFree(req);
L
Liu Jicong 已提交
1412
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1413
    return -1;
L
Liu Jicong 已提交
1414 1415
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1416
  pParam->async = async;
X
Xiaoyu Wang 已提交
1417
  tsem_init(&pParam->rspSem, 0, 0);
1418

1419
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1420 1421
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1422 1423
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1424
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1425 1426 1427 1428 1429 1430 1431 1432 1433 1434
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1435 1436 1437
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1438
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1439

L
Liu Jicong 已提交
1440 1441
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1442 1443
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1444 1445
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1446

L
Liu Jicong 已提交
1447
  if (!async) {
L
Liu Jicong 已提交
1448 1449 1450 1451 1452
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1453 1454
}

L
Liu Jicong 已提交
1455 1456
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1457 1458 1459 1460 1461 1462 1463 1464 1465 1466 1467 1468 1469
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1470
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1471 1472 1473 1474 1475 1476 1477
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1478
#endif
L
Liu Jicong 已提交
1479

1480
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1481 1482 1483 1484 1485 1486 1487 1488 1489 1490
  /*int64_t reqOffset;*/
  /*if (pVg->currentOffset >= 0) {*/
  /*reqOffset = pVg->currentOffset;*/
  /*} else {*/
  /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
  /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
  /*return NULL;*/
  /*}*/
  /*reqOffset = tmq->resetOffsetCfg;*/
  /*}*/
L
Liu Jicong 已提交
1491

L
Liu Jicong 已提交
1492
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1493 1494 1495
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1496

L
Liu Jicong 已提交
1497 1498 1499
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1500 1501 1502 1503
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1504

1505
  pReq->withTbName = tmq->withTbName;
1506
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1507
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1508
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1509 1510
  /*pReq->currentOffset = reqOffset;*/
  pReq->reqOffset = pVg->currentOffsetNew;
L
Liu Jicong 已提交
1511
  pReq->reqId = generateRequestId();
1512

L
Liu Jicong 已提交
1513 1514
  pReq->useSnapshot = tmq->useSnapshot;

1515
  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1516
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1517 1518 1519
  return pReq;
}

L
Liu Jicong 已提交
1520 1521
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
L
Liu Jicong 已提交
1522
  pRspObj->resType = RES_TYPE__TMQ_META;
L
Liu Jicong 已提交
1523 1524 1525 1526 1527 1528 1529 1530
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1531 1532 1533
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1534 1535
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1536
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1537
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1538
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
L
Liu Jicong 已提交
1539

L
Liu Jicong 已提交
1540 1541
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1542
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1543 1544
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1545

L
Liu Jicong 已提交
1546
  return pRspObj;
X
Xiaoyu Wang 已提交
1547 1548
}

1549
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
fix  
Liu Jicong 已提交
1550
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1551 1552 1553 1554 1555 1556
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1557
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1558
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1559
        continue;
L
Liu Jicong 已提交
1560
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1561 1562 1563 1564 1565 1566 1567
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1568
      }
L
Liu Jicong 已提交
1569
      atomic_store_32(&pVg->vgSkipCnt, 0);
1570
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1571 1572
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1573
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1574 1575
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1576
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1577
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1578
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1579
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1580
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1581 1582
        return -1;
      }
L
Liu Jicong 已提交
1583 1584
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1585
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1586
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1587 1588
      pParam->epoch = tmq->epoch;

1589
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1590
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1591 1592
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1593
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1594
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1595 1596 1597 1598
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1599
          .pData = pReq,
L
Liu Jicong 已提交
1600
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1601 1602
          .handle = NULL,
      };
L
Liu Jicong 已提交
1603
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1604
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1605
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1606
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1607
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1608 1609

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1610
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1611 1612 1613 1614 1615

      char offsetFormatBuf[50];
      tFormatOffset(offsetFormatBuf, 50, &pVg->currentOffsetNew);
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %s, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
L
fix  
Liu Jicong 已提交
1616
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1617 1618 1619 1620 1621 1622 1623 1624
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1625 1626
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1627
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1628 1629
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1630
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1631
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1632
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1633 1634 1635 1636 1637 1638 1639 1640 1641 1642
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1643
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1644
  while (1) {
L
Liu Jicong 已提交
1645 1646 1647
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1648
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1649 1650
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1651 1652
    }

L
Liu Jicong 已提交
1653 1654 1655 1656 1657
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1658
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1659
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1660
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1661
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1662
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1663
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1664
        pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1665
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1666
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1667 1668
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1669 1670
          continue;
        }
L
Liu Jicong 已提交
1671
        // build rsp
L
Liu Jicong 已提交
1672
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1673
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1674
        return pRsp;
X
Xiaoyu Wang 已提交
1675
      } else {
L
Liu Jicong 已提交
1676 1677 1678 1679 1680 1681 1682
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1683
      if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1684 1685
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1686
        pVg->currentOffsetNew = pollRspWrapper->metaRsp.rspOffsetNew;
L
Liu Jicong 已提交
1687 1688
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
L
Liu Jicong 已提交
1689
        SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1690 1691 1692 1693
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
L
Liu Jicong 已提交
1694
                 pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1695
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1696 1697
      }
    } else {
L
fix  
Liu Jicong 已提交
1698
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1699
      bool reset = false;
L
Liu Jicong 已提交
1700 1701
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1702
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1703
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1704
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1705 1706 1707 1708 1709
      }
    }
  }
}

1710
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1711 1712
  void*   rspObj;
  int64_t startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1713

1714 1715 1716
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1717
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1718 1719
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1720
  }
1721
#endif
X
Xiaoyu Wang 已提交
1722

L
Liu Jicong 已提交
1723
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1724
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1725 1726 1727
    return NULL;
  }

X
Xiaoyu Wang 已提交
1728
  while (1) {
L
Liu Jicong 已提交
1729
    tmqHandleAllDelayedTask(tmq);
1730
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1731

1732
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1733 1734
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1735 1736
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1737
    }
1738
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1739
      int64_t endTime = taosGetTimestampMs();
1740
      int64_t leftTime = endTime - startTime;
1741
      if (leftTime > timeout) {
L
Liu Jicong 已提交
1742
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1743 1744
        return NULL;
      }
1745
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1746 1747 1748
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1749 1750 1751 1752
    }
  }
}

L
Liu Jicong 已提交
1753
int32_t tmq_consumer_close(tmq_t* tmq) {
1754
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1755 1756
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1757
      return rsp;
1758 1759 1760 1761
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1762
    tmq_list_destroy(lst);
1763

L
Liu Jicong 已提交
1764
    if (rsp != 0) {
L
Liu Jicong 已提交
1765
      return rsp;
1766
    }
L
Liu Jicong 已提交
1767
  }
1768
  // TODO: free resources
L
Liu Jicong 已提交
1769
  return 0;
1770
}
L
Liu Jicong 已提交
1771

L
Liu Jicong 已提交
1772 1773
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1774
    return "success";
L
Liu Jicong 已提交
1775
  } else if (err == -1) {
L
Liu Jicong 已提交
1776 1777 1778
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1779 1780
  }
}
L
Liu Jicong 已提交
1781

L
Liu Jicong 已提交
1782 1783 1784 1785 1786 1787 1788 1789 1790 1791
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1792
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1793 1794
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1795
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1796 1797 1798
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1799 1800 1801 1802 1803
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1804 1805 1806 1807
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1808 1809 1810
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1811 1812 1813 1814 1815
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1816 1817 1818 1819
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1820 1821 1822
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1823 1824 1825 1826
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1827 1828 1829 1830 1831 1832 1833 1834

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1835
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1836 1837 1838
  }
  return NULL;
}
1839

1840
int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) {
L
Liu Jicong 已提交
1841 1842 1843 1844 1845 1846 1847 1848 1849
  if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    *raw_meta = pMetaRspObj->metaRsp.metaRsp;
    *raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    return 0;
  }
  return -1;
}

L
Liu Jicong 已提交
1850 1851
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1852 1853
}

L
Liu Jicong 已提交
1854
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); }