tmq.c 44.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
52
  SMqOffset offset;
L
Liu Jicong 已提交
53 54 55
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
56
  SArray container;  // SArray<tmq_topic_vgroup_t*>
L
Liu Jicong 已提交
57 58 59
};

struct tmq_conf_t {
60 61 62 63 64 65 66 67 68 69 70
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
L
Liu Jicong 已提交
76
  // conf
L
Liu Jicong 已提交
77 78
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
79
  int8_t         withTbName;
L
Liu Jicong 已提交
80
  int8_t         autoCommit;
L
Liu Jicong 已提交
81
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
82
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
83
  int64_t        consumerId;
L
Liu Jicong 已提交
84 85
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
86 87 88 89

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
90 91
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
92
  int32_t epSkipCnt;
L
Liu Jicong 已提交
93
#endif
L
Liu Jicong 已提交
94 95
  int64_t pollCnt;

L
Liu Jicong 已提交
96 97 98 99 100
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
101 102 103 104
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
105
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
106
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
107
  STaosQall*  qall;
L
Liu Jicong 已提交
108 109 110 111
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
112 113
};

X
Xiaoyu Wang 已提交
114 115 116 117 118 119 120 121
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
122
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
123 124
};

L
Liu Jicong 已提交
125 126 127 128 129 130
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
131
typedef struct {
132 133 134 135
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
136
  // connection info
137
  int32_t vgId;
X
Xiaoyu Wang 已提交
138
  int32_t vgStatus;
L
Liu Jicong 已提交
139
  int32_t vgSkipCnt;
140 141 142
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
143
typedef struct {
144
  // subscribe info
L
Liu Jicong 已提交
145 146 147 148
  char* topicName;

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
149 150
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
151 152
} SMqClientTopic;

L
Liu Jicong 已提交
153 154 155 156 157
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
158
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
159 160
} SMqPollRspWrapper;

L
Liu Jicong 已提交
161
typedef struct {
L
Liu Jicong 已提交
162 163 164 165
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167
typedef struct {
168
  tmq_t*  tmq;
L
Liu Jicong 已提交
169
  int32_t code;
L
Liu Jicong 已提交
170
  int32_t async;
X
Xiaoyu Wang 已提交
171
  tsem_t  rspSem;
172 173
} SMqAskEpCbParam;

L
Liu Jicong 已提交
174
typedef struct {
L
Liu Jicong 已提交
175 176
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
177
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
178
  int32_t         epoch;
L
Liu Jicong 已提交
179
  int32_t         vgId;
L
Liu Jicong 已提交
180
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
181
} SMqPollCbParam;
182

L
Liu Jicong 已提交
183
typedef struct {
L
Liu Jicong 已提交
184
  tmq_t*         tmq;
L
Liu Jicong 已提交
185 186
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
187
  int8_t         freeOffsets;
L
Liu Jicong 已提交
188
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
189 190
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
191
  SArray*        offsets;
L
Liu Jicong 已提交
192
  void*          userParam;
L
Liu Jicong 已提交
193
} SMqCommitCbParam;
L
Liu Jicong 已提交
194

195
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
196
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
197
  conf->withTbName = -1;
L
Liu Jicong 已提交
198
  conf->autoCommit = true;
L
Liu Jicong 已提交
199
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
200
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
201 202 203
  return conf;
}

L
Liu Jicong 已提交
204
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
205 206 207 208 209 210
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
211 212 213
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
214 215
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
216
    return TMQ_CONF_OK;
217
  }
L
Liu Jicong 已提交
218

219 220
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
221 222
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
223

L
Liu Jicong 已提交
224 225
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
226
      conf->autoCommit = true;
L
Liu Jicong 已提交
227 228
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
229
      conf->autoCommit = false;
L
Liu Jicong 已提交
230 231 232 233
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
234
  }
L
Liu Jicong 已提交
235

L
Liu Jicong 已提交
236 237 238 239 240
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
255

256 257 258
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->withTbName = 1;
L
Liu Jicong 已提交
259
      return TMQ_CONF_OK;
260 261
    } else if (strcmp(value, "false") == 0) {
      conf->withTbName = 0;
L
Liu Jicong 已提交
262
      return TMQ_CONF_OK;
263 264
    } else if (strcmp(value, "none") == 0) {
      conf->withTbName = -1;
L
Liu Jicong 已提交
265
      return TMQ_CONF_OK;
266 267 268 269 270
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
271
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
272 273 274
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
275
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
276 277 278
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
279
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
280 281 282
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
283
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
284 285 286
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
287
  if (strcmp(key, "td.connect.db") == 0) {
288
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
289 290 291
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
292
  return TMQ_CONF_UNKNOWN;
293 294 295
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
296 297
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
298 299
}

L
Liu Jicong 已提交
300 301 302
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
303
  if (taosArrayPush(container, &topic) == NULL) return -1;
304 305 306
  return 0;
}

L
Liu Jicong 已提交
307
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
308
  SArray* container = &list->container;
L
Liu Jicong 已提交
309
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
310 311
}

L
Liu Jicong 已提交
312 313 314 315 316 317 318 319 320 321
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
322 323 324 325
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
326 327 328 329 330 331 332 333 334 335 336
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
                            pParam->tmq->commitCbUserParam);
    } else if (!pParam->automatic && pParam->userCb) {
      pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
    }

L
Liu Jicong 已提交
337
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}

int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
355 356
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
357 358

  if (offsets == NULL) {
L
Liu Jicong 已提交
359
    freeOffsets = 1;
L
Liu Jicong 已提交
360 361 362 363 364 365 366 367 368 369 370 371 372 373
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
374
    freeOffsets = 0;
L
Liu Jicong 已提交
375 376 377 378 379 380
    pOffsets = (SArray*)&offsets->container;
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
381 382 383 384
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
385 386 387
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
388
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
389 390
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
391
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
392 393
    goto END;
  }
L
Liu Jicong 已提交
394 395
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
396 397 398 399 400 401 402 403 404 405 406 407
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
408
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

  sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
  }

  // avoid double free if msg is sent
  buf = NULL;

  code = 0;
END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
445 446
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
447 448 449 450 451 452 453 454 455

  if (code != 0 && async) {
    if (automatic) {
      tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
    } else {
      userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
    }
  }

L
Liu Jicong 已提交
456
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
457 458 459 460 461
    taosArrayDestroy(pOffsets);
  }
  return code;
}

L
Liu Jicong 已提交
462 463
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
464
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
465 466
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
467
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
468 469 470 471
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
472
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
473 474
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
475
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
476 477 478 479
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
480
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
481 482
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
483
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
484 485 486 487 488 489 490 491 492 493 494
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
495
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
496 497
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
498 499
      /*tmq_commit(tmq, NULL, true);*/
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
500 501 502 503 504
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
505
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
506 507 508 509 510
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
511
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
512
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
513 514 515 516 517 518 519 520
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
521
  msg = NULL;
L
Liu Jicong 已提交
522 523 524 525 526 527 528 529 530 531
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
532 533 534
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
535
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
536 537 538
  tsem_post(&pParam->rspSem);
  return 0;
}
539

X
Xiaoyu Wang 已提交
540 541 542 543 544
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
545
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
546
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
547 548 549 550 551
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
552 553 554 555
  tmq_list_t*    lst = tmq_list_new();
  tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
556 557
}

L
Liu Jicong 已提交
558
#if 0
559
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
560
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
561 562 563 564 565 566
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
567
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
568
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
569
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
570
  // set conf
571 572
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
573
  pTmq->autoCommit = conf->autoCommit;
574
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
575
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
576

L
Liu Jicong 已提交
577 578 579 580 581 582 583 584 585 586
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
587 588
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
589 590
  return pTmq;
}
L
Liu Jicong 已提交
591
#endif
L
Liu Jicong 已提交
592

L
Liu Jicong 已提交
593
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
594 595 596 597 598 599 600 601 602 603 604
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
605 606 607 608
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
609

L
Liu Jicong 已提交
610 611 612 613 614
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
615
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
616

L
Liu Jicong 已提交
617 618 619 620 621 622 623 624
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
625

L
Liu Jicong 已提交
626 627
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
628 629
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
630 631
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
632

L
Liu Jicong 已提交
633 634 635
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
636
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
637
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
638
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
639 640
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
641 642
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
643
  // assign consumerId
L
Liu Jicong 已提交
644
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
645

L
Liu Jicong 已提交
646 647 648 649
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
650

L
Liu Jicong 已提交
651 652 653 654 655 656
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
657

658
  return pTmq;
L
Liu Jicong 已提交
659 660 661 662 663 664 665 666

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
667 668
}

L
Liu Jicong 已提交
669
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
670 671
  return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
#if 0
L
Liu Jicong 已提交
672 673 674
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
675 676
  // build msg
  // send to mnode
L
Liu Jicong 已提交
677
  SMqCMCommitOffsetReq req;
L
Liu Jicong 已提交
678
  SArray*              pOffsets = NULL;
L
Liu Jicong 已提交
679 680

  if (offsets == NULL) {
L
Liu Jicong 已提交
681
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
L
Liu Jicong 已提交
682 683 684 685 686 687 688 689 690
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
L
Liu Jicong 已提交
691
        taosArrayPush(pOffsets, &offset);
L
Liu Jicong 已提交
692 693
      }
    }
L
Liu Jicong 已提交
694 695
    req.num = pOffsets->size;
    req.offsets = pOffsets->pData;
L
Liu Jicong 已提交
696
  } else {
L
Liu Jicong 已提交
697 698
    req.num = taosArrayGetSize(&offsets->container);
    req.offsets = (SMqOffset*)offsets->container.pData;
L
Liu Jicong 已提交
699
  }
L
Liu Jicong 已提交
700

H
Hongze Cheng 已提交
701
  SEncoder encoder;
L
Liu Jicong 已提交
702

H
Hongze Cheng 已提交
703
  tEncoderInit(&encoder, NULL, 0);
L
Liu Jicong 已提交
704
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
705
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
706
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
707
  if (buf == NULL) {
H
Hongze Cheng 已提交
708
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
709 710
    return -1;
  }
H
Hongze Cheng 已提交
711
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
712

H
Hongze Cheng 已提交
713
  tEncoderInit(&encoder, buf, tlen);
L
Liu Jicong 已提交
714
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
H
Hongze Cheng 已提交
715
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
716

L
Liu Jicong 已提交
717
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
718 719 720 721
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

L
Liu Jicong 已提交
722
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
723 724 725 726 727
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
728
  pParam->async = async;
L
Liu Jicong 已提交
729
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
730

X
Xiaoyu Wang 已提交
731 732 733 734 735
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
736 737

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
738
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
739 740
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
741 742 743 744 745
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
746 747 748
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
L
Liu Jicong 已提交
749 750
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
751

L
Liu Jicong 已提交
752 753
    if (pOffsets) {
      taosArrayDestroy(pOffsets);
L
Liu Jicong 已提交
754
    }
L
Liu Jicong 已提交
755 756 757
  }

  return resp;
L
Liu Jicong 已提交
758
#endif
L
Liu Jicong 已提交
759 760
}

L
Liu Jicong 已提交
761 762 763 764 765 766
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
767 768

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
769
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
770
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
771
  if (req.topicNames == NULL) goto FAIL;
772

L
Liu Jicong 已提交
773 774
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
775 776

    SName name = {0};
L
Liu Jicong 已提交
777
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
778

L
Liu Jicong 已提交
779 780 781
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
782
    }
L
Liu Jicong 已提交
783
    tNameExtractFullName(&name, topicFName);
784

L
Liu Jicong 已提交
785 786 787
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
788 789
  }

L
Liu Jicong 已提交
790 791 792 793
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

794 795 796
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
797 798
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
  if (sendInfo == NULL) goto FAIL;
799

X
Xiaoyu Wang 已提交
800 801 802 803
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
804

L
Liu Jicong 已提交
805 806 807
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
808 809 810 811
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
812

L
Liu Jicong 已提交
813 814
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
815 816
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
817 818
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

819 820 821 822 823
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
824 825 826
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
827 828
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
829

L
Liu Jicong 已提交
830 831 832
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
833
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
834
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
835 836
    taosMsleep(500);
  }
837

L
Liu Jicong 已提交
838
  // init hb timer
839 840 841
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
842 843

  // init auto commit timer
844
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
845 846 847
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
848 849 850
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
851
  if (code != 0 && buf) {
L
Liu Jicong 已提交
852 853 854
    taosMemoryFree(buf);
  }
  return code;
855 856
}

L
Liu Jicong 已提交
857
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
858
  //
859
  conf->commitCb = cb;
L
Liu Jicong 已提交
860
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
861
}
862

863
#if 0
L
Liu Jicong 已提交
864 865 866 867 868 869 870 871 872 873 874 875 876 877 878
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
879 880
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
881 882 883 884 885 886 887 888 889 890 891 892
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
893
  int32_t code = 0;
L
Liu Jicong 已提交
894
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
D
stmt  
dapan1121 已提交
895
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
L
Liu Jicong 已提交
896 897 898 899 900 901 902 903 904 905 906 907 908 909
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
910
  strcpy(req.targetStbFullName, tbName);
L
Liu Jicong 已提交
911 912

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
913
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
936
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
937 938 939 940 941 942 943 944 945 946 947
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
948
#endif
L
Liu Jicong 已提交
949

L
Liu Jicong 已提交
950
#if 0
L
Liu Jicong 已提交
951 952
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
953
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
954 955
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
956
#endif
L
Liu Jicong 已提交
957 958

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
959 960
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
961
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
962
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
963 964 965
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
966
  if (code != 0) {
L
Liu Jicong 已提交
967 968
    tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
fix txn  
Liu Jicong 已提交
969
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
970 971
  }

X
Xiaoyu Wang 已提交
972 973 974
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
975
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
976
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
977
            tmqEpoch);
978
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
979
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
980 981 982 983
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
984
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
985 986
  }

987
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
988
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
989 990
    taosMemoryFree(pMsg->pData);
    tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
991
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
992
  }
L
Liu Jicong 已提交
993

L
Liu Jicong 已提交
994 995 996
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
997

L
Liu Jicong 已提交
998 999
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
1000
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
1001
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1002

L
Liu Jicong 已提交
1003 1004
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
1005

L
Liu Jicong 已提交
1006
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1007
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1008

L
Liu Jicong 已提交
1009
  return 0;
L
fix txn  
Liu Jicong 已提交
1010
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1011
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1012 1013
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1014
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1015
  return -1;
1016 1017
}

L
Liu Jicong 已提交
1018
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1019
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1020
  bool    set = false;
L
Liu Jicong 已提交
1021 1022
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
1023 1024
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1037 1038
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1039
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1040
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1041
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1042

L
Liu Jicong 已提交
1043
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1044 1045 1046 1047 1048 1049
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
1050
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1051 1052 1053 1054
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1055
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1056 1057 1058 1059 1060 1061 1062 1063 1064
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1065
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1066 1067 1068
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
1069
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1070 1071
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
1072
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
1073
      }
L
Liu Jicong 已提交
1074
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1075 1076
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1077
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1078 1079 1080
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1081
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1082 1083 1084 1085
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1086
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1087
  }
L
Liu Jicong 已提交
1088
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1089
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1090
  tmq->clientTopics = newTopics;
1091

1092 1093 1094 1095
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1096

X
Xiaoyu Wang 已提交
1097 1098 1099 1100
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1101
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1102
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1103
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1104
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1105
  pParam->code = code;
1106
  if (code != 0) {
L
Liu Jicong 已提交
1107
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1108
    goto END;
1109
  }
L
Liu Jicong 已提交
1110

L
Liu Jicong 已提交
1111
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1112
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1113
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1114 1115
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1116
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1117 1118
  if (head->epoch <= epoch) {
    goto END;
1119
  }
L
Liu Jicong 已提交
1120

L
Liu Jicong 已提交
1121
  if (!async) {
L
Liu Jicong 已提交
1122 1123
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
1124 1125
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
1126
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1127
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1128
  } else {
1129
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1130
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1131
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1132 1133
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1134
    }
L
Liu Jicong 已提交
1135 1136 1137
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1138
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1139

L
Liu Jicong 已提交
1140
    taosWriteQitem(tmq->mqueue, pWrapper);
1141
    tsem_post(&tmq->rspSem);
1142
  }
L
Liu Jicong 已提交
1143 1144

END:
L
Liu Jicong 已提交
1145
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1146
  if (!async) {
L
Liu Jicong 已提交
1147
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1148 1149
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1150 1151
  }
  return code;
1152 1153
}

L
Liu Jicong 已提交
1154
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1155
  int32_t code = 0;
L
Liu Jicong 已提交
1156
#if 0
L
Liu Jicong 已提交
1157
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1158
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1159
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1160
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1161
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1162
  }
L
temp  
Liu Jicong 已提交
1163
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1164
#endif
L
Liu Jicong 已提交
1165
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1166
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1167
  if (req == NULL) {
L
Liu Jicong 已提交
1168
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1169
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1170
    return -1;
L
Liu Jicong 已提交
1171
  }
L
Liu Jicong 已提交
1172 1173 1174
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1175

L
Liu Jicong 已提交
1176
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1177 1178
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1179
    taosMemoryFree(req);
L
Liu Jicong 已提交
1180
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1181
    return -1;
L
Liu Jicong 已提交
1182 1183
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1184
  pParam->async = async;
X
Xiaoyu Wang 已提交
1185
  tsem_init(&pParam->rspSem, 0, 0);
1186

wafwerar's avatar
wafwerar 已提交
1187
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1188 1189
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1190 1191
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1192
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1203 1204 1205
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1206
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1207

L
Liu Jicong 已提交
1208 1209
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1210 1211
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1212 1213
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1214

L
Liu Jicong 已提交
1215
  if (!async) {
L
Liu Jicong 已提交
1216 1217 1218 1219 1220
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1221 1222
}

L
Liu Jicong 已提交
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1237
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1238 1239 1240 1241 1242 1243 1244 1245
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

L
Liu Jicong 已提交
1246
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1258
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1259 1260 1261
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1262

L
Liu Jicong 已提交
1263 1264 1265 1266 1267 1268 1269
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1270

1271
  pReq->withTbName = tmq->withTbName;
1272
  pReq->waitTime = waitTime;
L
Liu Jicong 已提交
1273
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1274
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1275
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1276
  pReq->reqId = generateRequestId();
1277 1278

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1279
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1280 1281 1282
  return pReq;
}

L
Liu Jicong 已提交
1283 1284 1285
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1286
  strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
1287
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1288 1289
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1290

L
Liu Jicong 已提交
1291 1292
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1293 1294 1295
  if (!pWrapper->msg.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1296

L
Liu Jicong 已提交
1297
  return pRspObj;
X
Xiaoyu Wang 已提交
1298 1299
}

1300
int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
L
fix  
Liu Jicong 已提交
1301
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1302 1303 1304 1305 1306 1307
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1308
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1309
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1310
        continue;
L
Liu Jicong 已提交
1311
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1312 1313 1314 1315 1316 1317 1318
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1319
      }
L
Liu Jicong 已提交
1320
      atomic_store_32(&pVg->vgSkipCnt, 0);
L
Liu Jicong 已提交
1321
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
X
Xiaoyu Wang 已提交
1322 1323
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1324
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1325 1326
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1327
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1328
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1329
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1330
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1331
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1332 1333
        return -1;
      }
L
Liu Jicong 已提交
1334 1335
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1336
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1337
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1338 1339
      pParam->epoch = tmq->epoch;

wafwerar's avatar
wafwerar 已提交
1340
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1341
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1342 1343
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1344
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1345
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1346 1347 1348 1349
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1350
          .pData = pReq,
L
Liu Jicong 已提交
1351
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1352 1353
          .handle = NULL,
      };
L
Liu Jicong 已提交
1354
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1355
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1356
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1357
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1358
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1359 1360

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1361
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1362 1363
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1364
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1365 1366 1367 1368 1369 1370 1371 1372
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1373 1374
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1375
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1376 1377
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1378
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1379
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1380
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1391
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1392
  while (1) {
L
Liu Jicong 已提交
1393 1394 1395
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1396
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1397 1398
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1399 1400
    }

L
Liu Jicong 已提交
1401 1402
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1403
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
L
fix  
Liu Jicong 已提交
1404
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1405
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1406
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1407
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1408
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1409
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1410
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1411
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1412 1413
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1414 1415
          continue;
        }
L
Liu Jicong 已提交
1416
        // build rsp
L
Liu Jicong 已提交
1417
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1418
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1419
        return pRsp;
X
Xiaoyu Wang 已提交
1420
      } else {
L
Liu Jicong 已提交
1421
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1422
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1423 1424
      }
    } else {
L
fix  
Liu Jicong 已提交
1425
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1426
      bool reset = false;
L
Liu Jicong 已提交
1427 1428
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1429
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1430
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1431
        tmqPollImpl(tmq, waitTime);
X
Xiaoyu Wang 已提交
1432 1433 1434 1435 1436
      }
    }
  }
}

1437
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
L
Liu Jicong 已提交
1438 1439
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1440

1441
  rspObj = tmqHandleAllRsp(tmq, wait_time, false);
L
Liu Jicong 已提交
1442 1443
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1444
  }
X
Xiaoyu Wang 已提交
1445

L
Liu Jicong 已提交
1446
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1447
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1448 1449 1450
    return NULL;
  }

X
Xiaoyu Wang 已提交
1451
  while (1) {
L
Liu Jicong 已提交
1452
    tmqHandleAllDelayedTask(tmq);
1453
    if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
L
Liu Jicong 已提交
1454

1455
    rspObj = tmqHandleAllRsp(tmq, wait_time, false);
L
Liu Jicong 已提交
1456 1457
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1458
    }
1459
    if (wait_time != 0) {
X
Xiaoyu Wang 已提交
1460
      int64_t endTime = taosGetTimestampMs();
1461 1462
      int64_t leftTime = endTime - startTime;
      if (leftTime > wait_time) {
L
Liu Jicong 已提交
1463
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1464 1465
        return NULL;
      }
1466
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1467 1468 1469
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1470 1471 1472 1473
    }
  }
}

L
Liu Jicong 已提交
1474
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
1475
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485
    tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp == TMQ_RESP_ERR__SUCCESS) {
      // TODO: free resources
      return TMQ_RESP_ERR__SUCCESS;
    } else {
      return TMQ_RESP_ERR__FAIL;
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1486
    tmq_list_destroy(lst);
1487

1488 1489 1490 1491 1492 1493
    if (rsp == TMQ_RESP_ERR__SUCCESS) {
      // TODO: free resources
      return TMQ_RESP_ERR__SUCCESS;
    } else {
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
1494
  }
1495 1496
  // TODO: free resources
  return TMQ_RESP_ERR__SUCCESS;
1497
}
L
Liu Jicong 已提交
1498 1499 1500 1501 1502 1503 1504

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1505

L
Liu Jicong 已提交
1506
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1507 1508
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1509
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1510 1511 1512 1513 1514 1515 1516 1517 1518 1519 1520 1521 1522
  } else {
    return NULL;
  }
}

int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
    const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
    return name;
  }
  return NULL;
}
L
Liu Jicong 已提交
1536 1537 1538 1539 1540 1541 1542
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
  tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}

DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
  return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}