tmq.c 39.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
52
  SMqOffset offset;
L
Liu Jicong 已提交
53 54 55
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
56
  SArray container;  // SArray<tmq_topic_vgroup_t*>
L
Liu Jicong 已提交
57 58 59
};

struct tmq_conf_t {
60 61 62 63 64 65 66 67 68 69 70
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
L
Liu Jicong 已提交
76
  // conf
L
Liu Jicong 已提交
77 78
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
79
  int8_t         withTbName;
L
Liu Jicong 已提交
80
  int8_t         autoCommit;
L
Liu Jicong 已提交
81
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
82
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
83
  int64_t        consumerId;
L
Liu Jicong 已提交
84 85
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
86 87 88 89

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
90 91
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
92
  int32_t epSkipCnt;
L
Liu Jicong 已提交
93
#endif
L
Liu Jicong 已提交
94 95
  int64_t pollCnt;

L
Liu Jicong 已提交
96 97 98 99 100
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
101 102 103 104
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
105
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
106
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
107
  STaosQall*  qall;
L
Liu Jicong 已提交
108 109 110 111
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
112 113
};

X
Xiaoyu Wang 已提交
114 115 116 117 118 119 120 121
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
122
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
123 124
};

L
Liu Jicong 已提交
125 126 127 128 129 130
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
131
typedef struct {
132 133 134 135
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
136
  // connection info
137
  int32_t vgId;
X
Xiaoyu Wang 已提交
138
  int32_t vgStatus;
L
Liu Jicong 已提交
139
  int32_t vgSkipCnt;
140 141 142
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
143
typedef struct {
144
  // subscribe info
L
Liu Jicong 已提交
145
  char* topicName;
L
Liu Jicong 已提交
146
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
147 148 149

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
150 151
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
152 153
} SMqClientTopic;

L
Liu Jicong 已提交
154 155 156 157 158
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
159
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
160 161
} SMqPollRspWrapper;

L
Liu Jicong 已提交
162
typedef struct {
L
Liu Jicong 已提交
163 164 165 166
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
167

L
Liu Jicong 已提交
168
typedef struct {
169
  tmq_t*  tmq;
L
Liu Jicong 已提交
170
  int32_t code;
L
Liu Jicong 已提交
171
  int32_t async;
X
Xiaoyu Wang 已提交
172
  tsem_t  rspSem;
173 174
} SMqAskEpCbParam;

L
Liu Jicong 已提交
175
typedef struct {
L
Liu Jicong 已提交
176 177
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
178
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
179
  int32_t         epoch;
L
Liu Jicong 已提交
180
  int32_t         vgId;
L
Liu Jicong 已提交
181
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
182
} SMqPollCbParam;
183

L
Liu Jicong 已提交
184
typedef struct {
L
Liu Jicong 已提交
185
  tmq_t*         tmq;
L
Liu Jicong 已提交
186 187
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
188
  int8_t         freeOffsets;
L
Liu Jicong 已提交
189
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
190 191
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
192
  SArray*        offsets;
L
Liu Jicong 已提交
193
  void*          userParam;
L
Liu Jicong 已提交
194
} SMqCommitCbParam;
L
Liu Jicong 已提交
195

196
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
197
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
198
  conf->withTbName = false;
L
Liu Jicong 已提交
199
  conf->autoCommit = true;
L
Liu Jicong 已提交
200
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
201
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
202 203 204
  return conf;
}

L
Liu Jicong 已提交
205
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
206 207 208 209 210 211
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
212 213 214
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
215 216
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
217
    return TMQ_CONF_OK;
218
  }
L
Liu Jicong 已提交
219

220 221
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
222 223
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
224

L
Liu Jicong 已提交
225 226
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
227
      conf->autoCommit = true;
L
Liu Jicong 已提交
228 229
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
230
      conf->autoCommit = false;
L
Liu Jicong 已提交
231 232 233 234
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
235
  }
L
Liu Jicong 已提交
236

L
Liu Jicong 已提交
237 238 239 240 241
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
256

257 258
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
259
      conf->withTbName = true;
L
Liu Jicong 已提交
260
      return TMQ_CONF_OK;
261
    } else if (strcmp(value, "false") == 0) {
262
      conf->withTbName = false;
L
Liu Jicong 已提交
263
      return TMQ_CONF_OK;
264 265 266 267 268
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
269
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
270 271 272
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
273
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
274 275 276
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
277
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
278 279 280
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
281
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
282 283 284
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
285
  if (strcmp(key, "td.connect.db") == 0) {
286
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
287 288 289
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
290
  return TMQ_CONF_UNKNOWN;
291 292 293
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
294 295
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
296 297
}

L
Liu Jicong 已提交
298 299 300
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
301
  if (taosArrayPush(container, &topic) == NULL) return -1;
302 303 304
  return 0;
}

L
Liu Jicong 已提交
305
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
306
  SArray* container = &list->container;
L
Liu Jicong 已提交
307
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
308 309
}

L
Liu Jicong 已提交
310 311 312 313 314 315 316 317 318 319
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
320 321 322 323
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
324 325
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
326
  pParam->rspErr = code;
L
Liu Jicong 已提交
327 328 329 330 331 332 333 334
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
                            pParam->tmq->commitCbUserParam);
    } else if (!pParam->automatic && pParam->userCb) {
      pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
    }

L
Liu Jicong 已提交
335
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}

int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
353 354
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
355 356

  if (offsets == NULL) {
L
Liu Jicong 已提交
357
    freeOffsets = 1;
L
Liu Jicong 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
372
    freeOffsets = 0;
L
Liu Jicong 已提交
373 374 375 376 377 378
    pOffsets = (SArray*)&offsets->container;
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
379 380 381 382
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
383 384 385
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
386
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
387 388
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
389
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
390 391
    goto END;
  }
L
Liu Jicong 已提交
392 393
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
394 395 396 397 398 399 400 401 402 403 404 405
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
406
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
407 408 409 410
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

411
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
435 436
  } else {
    code = 0;
L
Liu Jicong 已提交
437 438 439 440 441 442 443
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
444 445
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
446 447 448

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
449
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
450
    } else {
L
Liu Jicong 已提交
451
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
452 453 454
    }
  }

L
Liu Jicong 已提交
455
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
456 457 458 459 460
    taosArrayDestroy(pOffsets);
  }
  return code;
}

L
Liu Jicong 已提交
461 462
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
463
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
464 465
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
466
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
467 468 469 470
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
471
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
472 473
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
474
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
475 476 477 478
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
479
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
480 481
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
482
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
483 484 485 486 487 488 489 490 491 492 493
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
494
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
495 496
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
497
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
498 499 500 501 502
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
503
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
504 505 506 507 508
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
509
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
510
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
511 512 513 514 515 516 517 518
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
519
  msg = NULL;
L
Liu Jicong 已提交
520 521 522 523 524 525 526 527 528 529
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
530 531 532
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
533
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
534 535 536
  tsem_post(&pParam->rspSem);
  return 0;
}
537

X
Xiaoyu Wang 已提交
538 539 540 541 542
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
543
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
544
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
545 546 547 548 549
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
550 551 552 553
  tmq_list_t*    lst = tmq_list_new();
  tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
554 555
}

L
Liu Jicong 已提交
556
#if 0
557
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
558
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
559 560 561 562 563 564
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
565
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
566
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
567
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
568
  // set conf
569 570
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
571
  pTmq->autoCommit = conf->autoCommit;
572
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
573
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
574

L
Liu Jicong 已提交
575 576 577 578 579 580 581 582 583 584
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
585 586
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
587 588
  return pTmq;
}
L
Liu Jicong 已提交
589
#endif
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
592 593 594 595 596 597 598 599 600 601 602
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
603 604 605 606
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
607

L
Liu Jicong 已提交
608 609 610 611 612
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
613
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
614

L
Liu Jicong 已提交
615 616 617 618 619 620 621 622
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
623

L
Liu Jicong 已提交
624 625
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
626 627
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
628 629
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
630

L
Liu Jicong 已提交
631 632 633
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
634
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
635
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
636
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
637 638
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
639 640
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
641
  // assign consumerId
L
Liu Jicong 已提交
642
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
643

L
Liu Jicong 已提交
644 645 646 647
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
648

L
Liu Jicong 已提交
649 650 651 652 653 654
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
655

656
  return pTmq;
L
Liu Jicong 已提交
657 658 659 660 661 662 663 664

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
665 666
}

L
Liu Jicong 已提交
667
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
668
  return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
669 670
}

L
Liu Jicong 已提交
671 672 673 674 675 676
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
677 678

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
679
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
680
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
681
  if (req.topicNames == NULL) goto FAIL;
682

L
Liu Jicong 已提交
683 684
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
685 686

    SName name = {0};
L
Liu Jicong 已提交
687
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
688

L
Liu Jicong 已提交
689 690 691
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
692
    }
L
Liu Jicong 已提交
693
    tNameExtractFullName(&name, topicFName);
694

L
Liu Jicong 已提交
695 696 697
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
698 699
  }

L
Liu Jicong 已提交
700 701 702 703
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

704 705 706
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

707
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
708
  if (sendInfo == NULL) goto FAIL;
709

X
Xiaoyu Wang 已提交
710 711 712 713
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
714

L
Liu Jicong 已提交
715 716 717
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
718 719 720 721
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
722

L
Liu Jicong 已提交
723 724
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
725 726
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
727 728
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

729 730 731 732 733
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
734 735 736
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
737 738
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
739

L
Liu Jicong 已提交
740 741 742
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
743
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
744
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
745 746
    taosMsleep(500);
  }
747

L
Liu Jicong 已提交
748
  // init hb timer
749 750 751
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
752 753

  // init auto commit timer
754
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
755 756 757
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
758 759 760
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
761
  if (code != 0 && buf) {
L
Liu Jicong 已提交
762 763 764
    taosMemoryFree(buf);
  }
  return code;
765 766
}

L
Liu Jicong 已提交
767
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
768
  //
769
  conf->commitCb = cb;
L
Liu Jicong 已提交
770
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
771
}
772

L
Liu Jicong 已提交
773
#if 0
L
Liu Jicong 已提交
774 775
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
776
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
777 778
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
779
#endif
L
Liu Jicong 已提交
780 781

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
782 783
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
784
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
785
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
786 787 788
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
789
  if (code != 0) {
L
Liu Jicong 已提交
790 791
    tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
fix txn  
Liu Jicong 已提交
792
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
793 794
  }

X
Xiaoyu Wang 已提交
795 796 797
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
798
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
799
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
800
            tmqEpoch);
801
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
802
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
803 804 805 806
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
807
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
808 809
  }

810
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
811
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
812 813
    taosMemoryFree(pMsg->pData);
    tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
814
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
815
  }
L
Liu Jicong 已提交
816

L
Liu Jicong 已提交
817 818 819
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
820

L
Liu Jicong 已提交
821 822
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
823
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
824
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
825

L
Liu Jicong 已提交
826 827
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
828

L
Liu Jicong 已提交
829
  taosWriteQitem(tmq->mqueue, pRspWrapper);
830
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
831

L
Liu Jicong 已提交
832
  return 0;
L
fix txn  
Liu Jicong 已提交
833
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
834
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
835 836
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
837
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
838
  return -1;
839 840
}

L
Liu Jicong 已提交
841
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
842
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
843
  bool    set = false;
L
Liu Jicong 已提交
844 845
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
846 847
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
848 849 850 851 852 853 854 855 856 857 858 859
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
860 861
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
862
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
863
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
864
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
865
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
866

L
Liu Jicong 已提交
867
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
868 869 870 871 872 873
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
874
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
875 876 877 878
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
879
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
880 881 882 883 884 885 886 887 888
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
889
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
890 891 892
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
893
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
894 895
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
896
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
897
      }
L
Liu Jicong 已提交
898
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
899 900
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
901
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
902 903 904
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
905
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
906 907 908 909
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
910
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
911
  }
L
Liu Jicong 已提交
912
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
913
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
914
  tmq->clientTopics = newTopics;
915

916 917 918 919
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
920

X
Xiaoyu Wang 已提交
921 922 923 924
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
925
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
926
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
927
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
928
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
929
  pParam->code = code;
930
  if (code != 0) {
L
Liu Jicong 已提交
931
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
932
    goto END;
933
  }
L
Liu Jicong 已提交
934

L
Liu Jicong 已提交
935
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
936
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
937
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
938 939
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
940
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
941 942
  if (head->epoch <= epoch) {
    goto END;
943
  }
L
Liu Jicong 已提交
944

L
Liu Jicong 已提交
945
  if (!async) {
L
Liu Jicong 已提交
946 947
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
948 949
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
950
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
951
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
952
  } else {
953
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
954
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
955
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
956 957
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
958
    }
L
Liu Jicong 已提交
959 960 961
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
962
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
963

L
Liu Jicong 已提交
964
    taosWriteQitem(tmq->mqueue, pWrapper);
965
    tsem_post(&tmq->rspSem);
966
  }
L
Liu Jicong 已提交
967 968

END:
L
Liu Jicong 已提交
969
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
970
  if (!async) {
L
Liu Jicong 已提交
971
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
972 973
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
974 975
  }
  return code;
976 977
}

L
Liu Jicong 已提交
978
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
979
  int32_t code = 0;
L
Liu Jicong 已提交
980
#if 0
L
Liu Jicong 已提交
981
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
982
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
983
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
984
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
985
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
986
  }
L
temp  
Liu Jicong 已提交
987
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
988
#endif
L
Liu Jicong 已提交
989
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
990
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
991
  if (req == NULL) {
L
Liu Jicong 已提交
992
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
993
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
994
    return -1;
L
Liu Jicong 已提交
995
  }
L
Liu Jicong 已提交
996 997 998
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
999

L
Liu Jicong 已提交
1000
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1001 1002
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1003
    taosMemoryFree(req);
L
Liu Jicong 已提交
1004
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1005
    return -1;
L
Liu Jicong 已提交
1006 1007
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1008
  pParam->async = async;
X
Xiaoyu Wang 已提交
1009
  tsem_init(&pParam->rspSem, 0, 0);
1010

1011
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1012 1013
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1014 1015
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1016
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1027 1028 1029
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1030
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1031

L
Liu Jicong 已提交
1032 1033
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1034 1035
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1036 1037
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1038

L
Liu Jicong 已提交
1039
  if (!async) {
L
Liu Jicong 已提交
1040 1041 1042 1043 1044
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1045 1046
}

L
Liu Jicong 已提交
1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1061
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1062 1063 1064 1065 1066 1067 1068 1069
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

1070
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1082
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1083 1084 1085
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1086

L
Liu Jicong 已提交
1087 1088 1089 1090 1091 1092 1093
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1094

1095
  pReq->withTbName = tmq->withTbName;
1096
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1097
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1098
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1099
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1100
  pReq->reqId = generateRequestId();
1101 1102

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1103
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1104 1105 1106
  return pReq;
}

L
Liu Jicong 已提交
1107 1108 1109
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1110 1111
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1112
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1113 1114
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1115

L
Liu Jicong 已提交
1116 1117
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1118 1119 1120
  if (!pWrapper->msg.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1121

L
Liu Jicong 已提交
1122
  return pRspObj;
X
Xiaoyu Wang 已提交
1123 1124
}

1125
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
fix  
Liu Jicong 已提交
1126
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1127 1128 1129 1130 1131 1132
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1133
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1134
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1135
        continue;
L
Liu Jicong 已提交
1136
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1137 1138 1139 1140 1141 1142 1143
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1144
      }
L
Liu Jicong 已提交
1145
      atomic_store_32(&pVg->vgSkipCnt, 0);
1146
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1147 1148
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1149
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1150 1151
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1152
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1153
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1154
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1155
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1156
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1157 1158
        return -1;
      }
L
Liu Jicong 已提交
1159 1160
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1161
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1162
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1163 1164
      pParam->epoch = tmq->epoch;

1165
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1166
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1167 1168
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1169
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1170
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1171 1172 1173 1174
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1175
          .pData = pReq,
L
Liu Jicong 已提交
1176
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1177 1178
          .handle = NULL,
      };
L
Liu Jicong 已提交
1179
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1180
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1181
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1182
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1183
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1184 1185

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1186
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1187 1188
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1189
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1190 1191 1192 1193 1194 1195 1196 1197
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1198 1199
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1200
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1201 1202
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1203
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1204
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1205
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1216
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1217
  while (1) {
L
Liu Jicong 已提交
1218 1219 1220
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1221
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1222 1223
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1224 1225
    }

L
Liu Jicong 已提交
1226 1227
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1228
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
L
fix  
Liu Jicong 已提交
1229
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1230
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1231
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1232
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1233
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1234
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1235
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1236
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1237 1238
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1239 1240
          continue;
        }
L
Liu Jicong 已提交
1241
        // build rsp
L
Liu Jicong 已提交
1242
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1243
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1244
        return pRsp;
X
Xiaoyu Wang 已提交
1245
      } else {
L
Liu Jicong 已提交
1246
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1247
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1248 1249
      }
    } else {
L
fix  
Liu Jicong 已提交
1250
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1251
      bool reset = false;
L
Liu Jicong 已提交
1252 1253
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1254
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1255
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1256
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1257 1258 1259 1260 1261
      }
    }
  }
}

1262
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1263 1264
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1265

1266
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1267 1268
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1269
  }
X
Xiaoyu Wang 已提交
1270

L
Liu Jicong 已提交
1271
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1272
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1273 1274 1275
    return NULL;
  }

X
Xiaoyu Wang 已提交
1276
  while (1) {
L
Liu Jicong 已提交
1277
    tmqHandleAllDelayedTask(tmq);
1278
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1279

1280
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1281 1282
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1283
    }
1284
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1285
      int64_t endTime = taosGetTimestampMs();
1286
      int64_t leftTime = endTime - startTime;
1287
      if (leftTime > timeout) {
L
Liu Jicong 已提交
1288
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1289 1290
        return NULL;
      }
1291
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1292 1293 1294
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1295 1296 1297 1298
    }
  }
}

L
Liu Jicong 已提交
1299
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
1300
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
1301
    tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
L
Liu Jicong 已提交
1302 1303
    if (rsp != TMQ_RESP_ERR__SUCCESS) {
      return rsp;
1304 1305 1306 1307
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1308
    tmq_list_destroy(lst);
1309

L
Liu Jicong 已提交
1310 1311
    if (rsp != TMQ_RESP_ERR__SUCCESS) {
      return rsp;
1312
    }
L
Liu Jicong 已提交
1313
  }
1314 1315
  // TODO: free resources
  return TMQ_RESP_ERR__SUCCESS;
1316
}
L
Liu Jicong 已提交
1317 1318 1319 1320

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
L
Liu Jicong 已提交
1321 1322 1323 1324
  } else if (err == TMQ_RESP_ERR__FAIL) {
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1325 1326
  }
}
L
Liu Jicong 已提交
1327

L
Liu Jicong 已提交
1328
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1329 1330
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1331
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1332 1333 1334 1335 1336
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1337 1338 1339 1340 1341 1342 1343 1344 1345
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1346 1347 1348 1349 1350 1351 1352 1353
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
    const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
    return name;
  }
  return NULL;
}
1367 1368

void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1369 1370 1371
  tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}

1372
tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
L
Liu Jicong 已提交
1373 1374
  return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}