tmq.c 44.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
52
  SMqOffset offset;
L
Liu Jicong 已提交
53 54 55
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
56
  SArray container;  // SArray<tmq_topic_vgroup_t*>
L
Liu Jicong 已提交
57 58 59
};

struct tmq_conf_t {
60 61 62 63 64 65 66 67 68 69 70
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
L
Liu Jicong 已提交
76
  // conf
L
Liu Jicong 已提交
77 78
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
79
  int8_t         withTbName;
L
Liu Jicong 已提交
80
  int8_t         autoCommit;
L
Liu Jicong 已提交
81
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
82
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
83
  int64_t        consumerId;
L
Liu Jicong 已提交
84 85
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
86 87 88 89

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
90 91
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
92
  int32_t epSkipCnt;
L
Liu Jicong 已提交
93
#endif
L
Liu Jicong 已提交
94 95
  int64_t pollCnt;

L
Liu Jicong 已提交
96 97 98 99 100
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
101 102 103 104
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
105
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
106
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
107
  STaosQall*  qall;
L
Liu Jicong 已提交
108 109 110 111
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
112 113
};

X
Xiaoyu Wang 已提交
114 115 116 117 118 119 120 121
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
122
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
123 124
};

L
Liu Jicong 已提交
125 126 127 128 129 130
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
131
typedef struct {
132 133 134
  // statistics
  int64_t pollCnt;
  // offset
135
  int64_t committedOffset;
136
  int64_t currentOffset;
L
Liu Jicong 已提交
137
  // connection info
138
  int32_t vgId;
X
Xiaoyu Wang 已提交
139
  int32_t vgStatus;
L
Liu Jicong 已提交
140
  int32_t vgSkipCnt;
141 142 143
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
144
typedef struct {
145
  // subscribe info
L
Liu Jicong 已提交
146
  char* topicName;
L
Liu Jicong 已提交
147
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
148 149 150

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
151 152
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
153 154
} SMqClientTopic;

L
Liu Jicong 已提交
155 156 157 158 159
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
160
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
161 162
} SMqPollRspWrapper;

L
Liu Jicong 已提交
163
typedef struct {
L
Liu Jicong 已提交
164 165 166 167
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
168

L
Liu Jicong 已提交
169
typedef struct {
170
  tmq_t*  tmq;
L
Liu Jicong 已提交
171
  int32_t code;
L
Liu Jicong 已提交
172
  int32_t async;
X
Xiaoyu Wang 已提交
173
  tsem_t  rspSem;
174 175
} SMqAskEpCbParam;

L
Liu Jicong 已提交
176
typedef struct {
L
Liu Jicong 已提交
177 178
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
179
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
180
  int32_t         epoch;
L
Liu Jicong 已提交
181
  int32_t         vgId;
L
Liu Jicong 已提交
182
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
183
} SMqPollCbParam;
184

L
Liu Jicong 已提交
185
typedef struct {
L
Liu Jicong 已提交
186
  tmq_t*         tmq;
L
Liu Jicong 已提交
187 188
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
189
  int8_t         freeOffsets;
L
Liu Jicong 已提交
190
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
191 192
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
193
  SArray*        offsets;
L
Liu Jicong 已提交
194
  void*          userParam;
L
Liu Jicong 已提交
195
} SMqCommitCbParam;
L
Liu Jicong 已提交
196

197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216
typedef struct {
  tmq_t*         tmq;
  int8_t         automatic;
  int8_t         async;
  int8_t         freeOffsets;
  int8_t         waitingRspNum;
  int8_t         totalRspNum;
  tmq_resp_err_t rspErr;
  tmq_commit_cb* userCb;
  SArray*        successfulOffsets;
  SArray*        failedOffsets;
  void*          userParam;
  tsem_t         rspSem;
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

217
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
218
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
219
  conf->withTbName = false;
L
Liu Jicong 已提交
220
  conf->autoCommit = true;
L
Liu Jicong 已提交
221
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
222
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
223 224 225
  return conf;
}

L
Liu Jicong 已提交
226
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
227 228 229 230 231 232
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
233 234 235
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
236 237
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
238
    return TMQ_CONF_OK;
239
  }
L
Liu Jicong 已提交
240

241 242
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
243 244
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
245

L
Liu Jicong 已提交
246 247
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
248
      conf->autoCommit = true;
L
Liu Jicong 已提交
249 250
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
251
      conf->autoCommit = false;
L
Liu Jicong 已提交
252 253 254 255
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
256
  }
L
Liu Jicong 已提交
257

L
Liu Jicong 已提交
258 259 260 261 262
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
277

278 279
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
280
      conf->withTbName = true;
L
Liu Jicong 已提交
281
      return TMQ_CONF_OK;
282
    } else if (strcmp(value, "false") == 0) {
283
      conf->withTbName = false;
L
Liu Jicong 已提交
284
      return TMQ_CONF_OK;
285 286 287 288 289
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
290
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
291 292 293
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
294
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
295 296 297
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
298
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
299 300 301
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
302
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
303 304 305
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
306
  if (strcmp(key, "td.connect.db") == 0) {
307
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
308 309 310
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
311
  return TMQ_CONF_UNKNOWN;
312 313 314
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
315 316
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
317 318
}

L
Liu Jicong 已提交
319 320 321
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
322
  if (taosArrayPush(container, &topic) == NULL) return -1;
323 324 325
  return 0;
}

L
Liu Jicong 已提交
326
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
327
  SArray* container = &list->container;
L
Liu Jicong 已提交
328
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
329 330
}

L
Liu Jicong 已提交
331 332 333 334 335 336 337 338 339 340
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
341 342 343 344
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
345 346
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
347
  pParam->rspErr = code;
L
Liu Jicong 已提交
348 349 350 351 352 353 354 355
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
                            pParam->tmq->commitCbUserParam);
    } else if (!pParam->automatic && pParam->userCb) {
      pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
    }

L
Liu Jicong 已提交
356
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
357 358 359 360 361 362 363 364 365 366
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}

367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
  // count down waiting rsp
  int8_t waitingRspNum = atomic_sub_fetch_8(&pParam->params->waitingRspNum, 1);
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam);
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam);
      }
    }

    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
  return 0;
}

int32_t tmqComitInner2(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) {
  int32_t code = -1;

  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->freeOffsets = 1;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i);
      STqOffset*   pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
      if (pOffset == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
      int32_t tlen = strlen(tmq->groupId);
      memcpy(pOffset->subKey, tmq->groupId, tlen);
      pOffset->subKey[tlen] = TMQ_SEPARATOR;
      strcpy(pOffset->subKey + tlen + 1, pTopic->topicName);
      int32_t len;
      int32_t code;
      tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
      if (code < 0) {
        ASSERT(0);
      }
      void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
      ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);

      void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

      SEncoder encoder;
      tEncoderInit(&encoder, abuf, len);
      tEncodeSTqOffset(&encoder, pOffset);

      // build param
      SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
      pParam->params = pParamSet;
      pParam->pOffset = pOffset;

      // build send info
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (pMsgSendInfo == NULL) {
        // TODO
        continue;
      }
      pMsgSendInfo->msgInfo = (SDataBuf){
          .pData = buf,
          .len = len,
          .handle = NULL,
      };

      pMsgSendInfo->requestId = generateRequestId();
      pMsgSendInfo->requestObjRefId = 0;
      pMsgSendInfo->param = pParam;
      pMsgSendInfo->fp = tmqCommitCb2;
      pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
      // send msg

      SEpSet  epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
      int64_t transporterId = 0;
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
    }
  }

  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
      tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam);
    } else {
      userCb(tmq, code, NULL, userParam);
    }
  }

  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }

  return 0;
}

L
Liu Jicong 已提交
496 497 498 499 500 501 502
int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
503 504
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
505 506

  if (offsets == NULL) {
L
Liu Jicong 已提交
507
    freeOffsets = 1;
L
Liu Jicong 已提交
508 509 510 511 512 513 514 515 516 517 518 519 520 521
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
522
    freeOffsets = 0;
L
Liu Jicong 已提交
523 524 525 526 527 528
    pOffsets = (SArray*)&offsets->container;
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
529 530 531 532
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
533 534 535
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
536
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
537 538
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
539
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
540 541
    goto END;
  }
L
Liu Jicong 已提交
542 543
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
544 545 546 547 548 549 550 551 552 553 554 555
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
556
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
557 558 559 560
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

561
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
585 586
  } else {
    code = 0;
L
Liu Jicong 已提交
587 588 589 590 591 592 593
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
594 595
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
596 597 598

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
599
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
600
    } else {
L
Liu Jicong 已提交
601
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
602 603 604
    }
  }

L
Liu Jicong 已提交
605
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
606 607 608 609 610
    taosArrayDestroy(pOffsets);
  }
  return code;
}

L
Liu Jicong 已提交
611 612
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
613
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
614 615
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
616
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
617 618 619 620
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
621
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
622 623
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
624
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
625 626 627 628
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
629
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
630 631
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
632
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
633 634 635 636 637 638 639 640 641 642 643
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
644
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
645 646
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
647
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
648 649 650 651 652
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
653
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
654 655 656 657 658
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
659
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
660
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
661 662 663 664 665 666 667 668
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
669
  msg = NULL;
L
Liu Jicong 已提交
670 671 672 673 674 675 676 677 678 679
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
680 681 682
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
683
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
684 685 686
  tsem_post(&pParam->rspSem);
  return 0;
}
687

X
Xiaoyu Wang 已提交
688 689 690 691 692
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
693
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
694
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
695 696 697 698 699
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
700 701 702 703
  tmq_list_t*    lst = tmq_list_new();
  tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
704 705
}

L
Liu Jicong 已提交
706
#if 0
707
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
708
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
709 710 711 712 713 714
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
715
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
716
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
717
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
718
  // set conf
719 720
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
721
  pTmq->autoCommit = conf->autoCommit;
722
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
723
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
724

L
Liu Jicong 已提交
725 726 727 728 729 730 731 732 733 734
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
735 736
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
737 738
  return pTmq;
}
L
Liu Jicong 已提交
739
#endif
L
Liu Jicong 已提交
740

L
Liu Jicong 已提交
741
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
742 743 744 745 746 747 748 749 750 751 752
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
753 754 755 756
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
757

L
Liu Jicong 已提交
758 759 760 761 762
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
763
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
764

L
Liu Jicong 已提交
765 766 767 768 769 770 771 772
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
773

L
Liu Jicong 已提交
774 775
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
776 777
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
778 779
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
780

L
Liu Jicong 已提交
781 782 783
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
784
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
785
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
786
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
787 788
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
789 790
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
791
  // assign consumerId
L
Liu Jicong 已提交
792
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
793

L
Liu Jicong 已提交
794 795 796 797
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
798

L
Liu Jicong 已提交
799 800 801 802 803 804
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
805

806
  return pTmq;
L
Liu Jicong 已提交
807 808 809 810 811 812 813 814

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
815 816
}

L
Liu Jicong 已提交
817
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
818
  return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
819 820
}

L
Liu Jicong 已提交
821 822 823 824 825 826
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
827 828

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
829
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
830
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
831
  if (req.topicNames == NULL) goto FAIL;
832

L
Liu Jicong 已提交
833 834
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
835 836

    SName name = {0};
L
Liu Jicong 已提交
837
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
838

L
Liu Jicong 已提交
839 840 841
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
842
    }
L
Liu Jicong 已提交
843
    tNameExtractFullName(&name, topicFName);
844

L
Liu Jicong 已提交
845 846 847
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
848 849
  }

L
Liu Jicong 已提交
850 851 852 853
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

854 855 856
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

857
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
858
  if (sendInfo == NULL) goto FAIL;
859

X
Xiaoyu Wang 已提交
860 861 862 863
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
864

L
Liu Jicong 已提交
865 866 867
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
868 869 870 871
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
872

L
Liu Jicong 已提交
873 874
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
875 876
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
877 878
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

879 880 881 882 883
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
884 885 886
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
887 888
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
889

L
Liu Jicong 已提交
890 891 892
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
893
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
894
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
895 896
    taosMsleep(500);
  }
897

L
Liu Jicong 已提交
898
  // init hb timer
899 900 901
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
902 903

  // init auto commit timer
904
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
905 906 907
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
908 909 910
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
911
  if (code != 0 && buf) {
L
Liu Jicong 已提交
912 913 914
    taosMemoryFree(buf);
  }
  return code;
915 916
}

L
Liu Jicong 已提交
917
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
918
  //
919
  conf->commitCb = cb;
L
Liu Jicong 已提交
920
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
921
}
922

L
Liu Jicong 已提交
923
#if 0
L
Liu Jicong 已提交
924 925
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
926
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
927 928
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
929
#endif
L
Liu Jicong 已提交
930 931

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
932 933
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
934
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
935
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
936 937 938
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
939
  if (code != 0) {
L
Liu Jicong 已提交
940 941
    tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
fix txn  
Liu Jicong 已提交
942
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
943 944
  }

X
Xiaoyu Wang 已提交
945 946 947
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
948
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
949
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
950
            tmqEpoch);
951
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
952
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
953 954 955 956
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
957
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
958 959
  }

960
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
961
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
962 963
    taosMemoryFree(pMsg->pData);
    tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
964
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
965
  }
L
Liu Jicong 已提交
966

L
Liu Jicong 已提交
967 968 969
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
970

L
Liu Jicong 已提交
971 972
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
973
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
974
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
975

L
Liu Jicong 已提交
976 977
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
978

L
Liu Jicong 已提交
979
  taosWriteQitem(tmq->mqueue, pRspWrapper);
980
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
981

L
Liu Jicong 已提交
982
  return 0;
L
fix txn  
Liu Jicong 已提交
983
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
984
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
985 986
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
987
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
988
  return -1;
989 990
}

L
Liu Jicong 已提交
991
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
992
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
993
  bool    set = false;
L
Liu Jicong 已提交
994 995
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
996 997
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1010 1011
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1012
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1013
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1014
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1015
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1016

L
Liu Jicong 已提交
1017
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1018 1019 1020 1021 1022 1023
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
1024
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1025 1026 1027 1028
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1029
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1030 1031 1032 1033 1034 1035 1036 1037 1038
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1039
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1040 1041 1042
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
1043
      tscDebug("consumer %ld(epoch %d) original offset of vg %d is %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1044 1045
      if (pOffset != NULL) {
        offset = *pOffset;
1046 1047
        tscDebug("consumer %ld(epoch %d) receive offset of vg %d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
                 vgKey);
L
Liu Jicong 已提交
1048
      }
1049
      tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1050 1051
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1052
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1053 1054 1055
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1056
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1057 1058 1059 1060
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1061
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1062
  }
L
Liu Jicong 已提交
1063
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1064
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1065
  tmq->clientTopics = newTopics;
1066

1067 1068 1069 1070
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1071

X
Xiaoyu Wang 已提交
1072 1073 1074 1075
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1076
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1077
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1078
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1079
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1080
  pParam->code = code;
1081
  if (code != 0) {
L
Liu Jicong 已提交
1082
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1083
    goto END;
1084
  }
L
Liu Jicong 已提交
1085

L
Liu Jicong 已提交
1086
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1087
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1088
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1089 1090
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1091
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1092 1093
  if (head->epoch <= epoch) {
    goto END;
1094
  }
L
Liu Jicong 已提交
1095

L
Liu Jicong 已提交
1096
  if (!async) {
L
Liu Jicong 已提交
1097 1098
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
1099 1100
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
1101
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1102
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1103
  } else {
1104
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1105
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1106
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1107 1108
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1109
    }
L
Liu Jicong 已提交
1110 1111 1112
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1113
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1114

L
Liu Jicong 已提交
1115
    taosWriteQitem(tmq->mqueue, pWrapper);
1116
    tsem_post(&tmq->rspSem);
1117
  }
L
Liu Jicong 已提交
1118 1119

END:
L
Liu Jicong 已提交
1120
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1121
  if (!async) {
L
Liu Jicong 已提交
1122
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1123 1124
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1125 1126
  }
  return code;
1127 1128
}

L
Liu Jicong 已提交
1129
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1130
  int32_t code = 0;
L
Liu Jicong 已提交
1131
#if 0
L
Liu Jicong 已提交
1132
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1133
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1134
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1135
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1136
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1137
  }
L
temp  
Liu Jicong 已提交
1138
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1139
#endif
L
Liu Jicong 已提交
1140
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1141
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1142
  if (req == NULL) {
L
Liu Jicong 已提交
1143
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1144
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1145
    return -1;
L
Liu Jicong 已提交
1146
  }
L
Liu Jicong 已提交
1147 1148 1149
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1150

L
Liu Jicong 已提交
1151
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1152 1153
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1154
    taosMemoryFree(req);
L
Liu Jicong 已提交
1155
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1156
    return -1;
L
Liu Jicong 已提交
1157 1158
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1159
  pParam->async = async;
X
Xiaoyu Wang 已提交
1160
  tsem_init(&pParam->rspSem, 0, 0);
1161

1162
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1163 1164
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1165 1166
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1167
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1178 1179 1180
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1181
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1182

L
Liu Jicong 已提交
1183 1184
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1185 1186
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1187 1188
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1189

L
Liu Jicong 已提交
1190
  if (!async) {
L
Liu Jicong 已提交
1191 1192 1193 1194 1195
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1196 1197
}

L
Liu Jicong 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1212
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1213 1214 1215 1216 1217 1218 1219 1220
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

1221
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1233
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1234 1235 1236
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1237

L
Liu Jicong 已提交
1238 1239 1240 1241 1242 1243 1244
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1245

1246
  pReq->withTbName = tmq->withTbName;
1247
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1248
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1249
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1250
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1251
  pReq->reqId = generateRequestId();
1252 1253

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1254
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1255 1256 1257
  return pReq;
}

L
Liu Jicong 已提交
1258 1259 1260
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1261 1262
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1263
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1264 1265
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1266

L
Liu Jicong 已提交
1267 1268
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1269 1270 1271
  if (!pWrapper->msg.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1272

L
Liu Jicong 已提交
1273
  return pRspObj;
X
Xiaoyu Wang 已提交
1274 1275
}

1276
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
fix  
Liu Jicong 已提交
1277
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1278 1279 1280 1281 1282 1283
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1284
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1285
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1286
        continue;
L
Liu Jicong 已提交
1287
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1288 1289 1290 1291 1292 1293 1294
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1295
      }
L
Liu Jicong 已提交
1296
      atomic_store_32(&pVg->vgSkipCnt, 0);
1297
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1298 1299
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1300
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1301 1302
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1303
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1304
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1305
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1306
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1307
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1308 1309
        return -1;
      }
L
Liu Jicong 已提交
1310 1311
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1312
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1313
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1314 1315
      pParam->epoch = tmq->epoch;

1316
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1317
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1318 1319
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1320
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1321
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1322 1323 1324 1325
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1326
          .pData = pReq,
L
Liu Jicong 已提交
1327
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1328 1329
          .handle = NULL,
      };
L
Liu Jicong 已提交
1330
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1331
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1332
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1333
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1334
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1335 1336

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1337
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1338 1339
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1340
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1341 1342 1343 1344 1345 1346 1347 1348
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1349 1350
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1351
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1352 1353
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1354
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1355
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1356
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1367
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1368
  while (1) {
L
Liu Jicong 已提交
1369 1370 1371
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1372
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1373 1374
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1375 1376
    }

L
Liu Jicong 已提交
1377 1378
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1379
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1380 1381
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->msg.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1382
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1383
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1384
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1385
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1386
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1387 1388
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1389 1390
          continue;
        }
L
Liu Jicong 已提交
1391
        // build rsp
L
Liu Jicong 已提交
1392
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1393
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1394
        return pRsp;
X
Xiaoyu Wang 已提交
1395
      } else {
1396 1397
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n", pollRspWrapper->msg.head.epoch,
                 consumerEpoch);
L
Liu Jicong 已提交
1398
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1399 1400
      }
    } else {
L
fix  
Liu Jicong 已提交
1401
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1402
      bool reset = false;
L
Liu Jicong 已提交
1403 1404
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1405
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1406
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1407
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1408 1409 1410 1411 1412
      }
    }
  }
}

1413
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1414 1415
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1416

1417 1418 1419
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1420
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1421 1422
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1423
  }
1424
#endif
X
Xiaoyu Wang 已提交
1425

L
Liu Jicong 已提交
1426
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1427
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1428 1429 1430
    return NULL;
  }

X
Xiaoyu Wang 已提交
1431
  while (1) {
L
Liu Jicong 已提交
1432
    tmqHandleAllDelayedTask(tmq);
1433
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1434

1435
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1436 1437
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1438
    }
1439
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1440
      int64_t endTime = taosGetTimestampMs();
1441
      int64_t leftTime = endTime - startTime;
1442
      if (leftTime > timeout) {
L
Liu Jicong 已提交
1443
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1444 1445
        return NULL;
      }
1446
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1447 1448 1449
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1450 1451 1452 1453
    }
  }
}

L
Liu Jicong 已提交
1454
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
1455
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
1456
    tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
L
Liu Jicong 已提交
1457 1458
    if (rsp != TMQ_RESP_ERR__SUCCESS) {
      return rsp;
1459 1460 1461 1462
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1463
    tmq_list_destroy(lst);
1464

L
Liu Jicong 已提交
1465 1466
    if (rsp != TMQ_RESP_ERR__SUCCESS) {
      return rsp;
1467
    }
L
Liu Jicong 已提交
1468
  }
1469 1470
  // TODO: free resources
  return TMQ_RESP_ERR__SUCCESS;
1471
}
L
Liu Jicong 已提交
1472 1473 1474 1475

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
L
Liu Jicong 已提交
1476 1477 1478 1479
  } else if (err == TMQ_RESP_ERR__FAIL) {
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1480 1481
  }
}
L
Liu Jicong 已提交
1482

L
Liu Jicong 已提交
1483
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1484 1485
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1486
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1487 1488 1489 1490 1491
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1492 1493 1494 1495 1496 1497 1498 1499 1500
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1501 1502 1503 1504 1505 1506 1507 1508
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1509 1510 1511 1512 1513 1514 1515 1516

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1517
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1518 1519 1520
  }
  return NULL;
}
1521 1522

void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1523 1524 1525
  tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}

1526
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
L
Liu Jicong 已提交
1527 1528
  return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}