tmq.c 43.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
52
  SMqOffset offset;
L
Liu Jicong 已提交
53 54 55
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
56
  SArray container;  // SArray<tmq_topic_vgroup_t*>
L
Liu Jicong 已提交
57 58 59
};

struct tmq_conf_t {
60 61 62 63 64 65 66 67 68 69 70
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
L
Liu Jicong 已提交
76
  // conf
L
Liu Jicong 已提交
77 78
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
79
  int8_t         withTbName;
L
Liu Jicong 已提交
80
  int8_t         autoCommit;
L
Liu Jicong 已提交
81
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
82
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
83
  int64_t        consumerId;
L
Liu Jicong 已提交
84 85
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
86 87 88 89

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
90 91
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
92
  int32_t epSkipCnt;
L
Liu Jicong 已提交
93
#endif
L
Liu Jicong 已提交
94 95
  int64_t pollCnt;

L
Liu Jicong 已提交
96 97 98 99 100
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
101 102 103 104
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
105
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
106
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
107
  STaosQall*  qall;
L
Liu Jicong 已提交
108 109 110 111
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
112 113
};

X
Xiaoyu Wang 已提交
114 115 116 117 118 119 120 121
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
122
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
123 124
};

L
Liu Jicong 已提交
125 126 127 128 129 130
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
131
typedef struct {
132 133 134 135
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
136
  // connection info
137
  int32_t vgId;
X
Xiaoyu Wang 已提交
138
  int32_t vgStatus;
L
Liu Jicong 已提交
139
  int32_t vgSkipCnt;
140 141 142
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
143
typedef struct {
144
  // subscribe info
L
Liu Jicong 已提交
145 146 147 148
  char* topicName;

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
149 150
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
151 152
} SMqClientTopic;

L
Liu Jicong 已提交
153 154 155 156 157
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
158
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
159 160
} SMqPollRspWrapper;

L
Liu Jicong 已提交
161
typedef struct {
L
Liu Jicong 已提交
162 163 164 165
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
166

L
Liu Jicong 已提交
167
typedef struct {
168
  tmq_t*  tmq;
L
Liu Jicong 已提交
169
  int32_t code;
L
Liu Jicong 已提交
170
  int32_t async;
X
Xiaoyu Wang 已提交
171
  tsem_t  rspSem;
172 173
} SMqAskEpCbParam;

L
Liu Jicong 已提交
174
typedef struct {
L
Liu Jicong 已提交
175 176
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
177
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
178
  int32_t         epoch;
L
Liu Jicong 已提交
179
  int32_t         vgId;
L
Liu Jicong 已提交
180
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
181
} SMqPollCbParam;
182

L
Liu Jicong 已提交
183
typedef struct {
L
Liu Jicong 已提交
184
  tmq_t*         tmq;
L
Liu Jicong 已提交
185 186 187
  int8_t         async;
  int8_t         automatic;
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
188 189
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
190
  SArray*        offsets;
L
Liu Jicong 已提交
191
  void*          userParam;
L
Liu Jicong 已提交
192
} SMqCommitCbParam;
L
Liu Jicong 已提交
193

194
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
195
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
196
  conf->withTbName = -1;
L
Liu Jicong 已提交
197
  conf->autoCommit = true;
L
Liu Jicong 已提交
198
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
199
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
200 201 202
  return conf;
}

L
Liu Jicong 已提交
203
void tmq_conf_destroy(tmq_conf_t* conf) {
wafwerar's avatar
wafwerar 已提交
204
  if (conf) taosMemoryFree(conf);
L
Liu Jicong 已提交
205 206 207
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
208 209
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
210
    return TMQ_CONF_OK;
211
  }
L
Liu Jicong 已提交
212

213 214
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
215 216
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
217

L
Liu Jicong 已提交
218 219
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
220
      conf->autoCommit = true;
L
Liu Jicong 已提交
221 222
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
223
      conf->autoCommit = false;
L
Liu Jicong 已提交
224 225 226 227
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
228
  }
L
Liu Jicong 已提交
229

L
Liu Jicong 已提交
230 231 232 233 234
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
235 236 237 238 239 240 241 242 243 244 245 246 247 248
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
249

250 251 252
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
      conf->withTbName = 1;
L
Liu Jicong 已提交
253
      return TMQ_CONF_OK;
254 255
    } else if (strcmp(value, "false") == 0) {
      conf->withTbName = 0;
L
Liu Jicong 已提交
256
      return TMQ_CONF_OK;
257 258
    } else if (strcmp(value, "none") == 0) {
      conf->withTbName = -1;
L
Liu Jicong 已提交
259
      return TMQ_CONF_OK;
260 261 262 263 264
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
265
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
266 267 268
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
269
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
270 271 272
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
273
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
274 275 276
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
277
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
278 279 280
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
281
  if (strcmp(key, "td.connect.db") == 0) {
282
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
283 284 285
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
286
  return TMQ_CONF_UNKNOWN;
287 288 289
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
290 291
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
292 293
}

L
Liu Jicong 已提交
294 295 296
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
297
  if (taosArrayPush(container, &topic) == NULL) return -1;
298 299 300
  return 0;
}

L
Liu Jicong 已提交
301
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
302
  SArray* container = &list->container;
L
Liu Jicong 已提交
303
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
304 305
}

L
Liu Jicong 已提交
306 307 308 309 310 311 312 313 314 315
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
316 317 318 319
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
                            pParam->tmq->commitCbUserParam);
    } else if (!pParam->automatic && pParam->userCb) {
      pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
    }

    if (pParam->offsets) {
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}

int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;

  if (offsets == NULL) {
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
    pOffsets = (SArray*)&offsets->container;
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

  int32_t code;
  int32_t tlen = 0;
  tEncodeSize(tEncodeSMqCMCommitOffsetReq, &req, tlen, code);
  if (code < 0) {
    goto END;
  }
  code = -1;

  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
    goto END;
  }
  SEncoder encoder;
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

  sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
  }

  // avoid double free if msg is sent
  buf = NULL;

  code = 0;
END:
  if (buf) taosMemoryFree(buf);
  if (pParam) taosMemoryFree(pParam);
  if (sendInfo) taosMemoryFree(sendInfo);

  if (code != 0 && async) {
    if (automatic) {
      tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
    } else {
      userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
    }
  }

  if (offsets == NULL) {
    taosArrayDestroy(pOffsets);
  }
  return code;
}

L
Liu Jicong 已提交
449 450
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
451
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
452 453
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
454
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
455 456 457 458
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
459
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
460 461
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
462
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
463 464 465 466
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
467
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
468 469
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
470
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
471 472 473 474 475 476 477 478 479 480 481
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
482
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
483 484
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
485 486
      /*tmq_commit(tmq, NULL, true);*/
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
487 488 489 490 491 492 493 494 495 496
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
497
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
498
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
499 500 501 502 503 504 505 506
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
507
  msg = NULL;
L
Liu Jicong 已提交
508 509 510 511 512 513 514 515 516 517
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
518 519 520
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
521
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
522 523 524
  tsem_post(&pParam->rspSem);
  return 0;
}
525

X
Xiaoyu Wang 已提交
526 527 528 529 530
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
531
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
532
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
533 534 535 536 537
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
538 539 540 541
  tmq_list_t*    lst = tmq_list_new();
  tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
542 543
}

L
Liu Jicong 已提交
544
#if 0
545
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
546
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
547 548 549 550 551 552
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
553
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
554
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
555
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
556
  // set conf
557 558
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
559
  pTmq->autoCommit = conf->autoCommit;
560
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
561
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
562

L
Liu Jicong 已提交
563 564 565 566 567 568 569 570 571 572
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
573 574
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
575 576
  return pTmq;
}
L
Liu Jicong 已提交
577
#endif
L
Liu Jicong 已提交
578

L
Liu Jicong 已提交
579
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
580 581 582 583 584 585 586 587 588 589 590
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
591 592 593 594
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
595

L
Liu Jicong 已提交
596 597 598 599 600
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
601
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
602

L
Liu Jicong 已提交
603 604 605 606 607 608 609 610
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
611

L
Liu Jicong 已提交
612 613
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
614 615
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
616 617
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
618

L
Liu Jicong 已提交
619 620 621
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
622
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
623
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
624
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
625 626
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
627 628
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
629
  // assign consumerId
L
Liu Jicong 已提交
630
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
631

L
Liu Jicong 已提交
632 633 634 635
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
636

L
Liu Jicong 已提交
637 638 639 640 641 642
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
643

644
  return pTmq;
L
Liu Jicong 已提交
645 646 647 648 649 650 651 652

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
653 654
}

L
Liu Jicong 已提交
655
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
656 657
  return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
#if 0
L
Liu Jicong 已提交
658 659 660
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
661 662
  // build msg
  // send to mnode
L
Liu Jicong 已提交
663
  SMqCMCommitOffsetReq req;
L
Liu Jicong 已提交
664
  SArray*              pOffsets = NULL;
L
Liu Jicong 已提交
665 666

  if (offsets == NULL) {
L
Liu Jicong 已提交
667
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
L
Liu Jicong 已提交
668 669 670 671 672 673 674 675 676
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
L
Liu Jicong 已提交
677
        taosArrayPush(pOffsets, &offset);
L
Liu Jicong 已提交
678 679
      }
    }
L
Liu Jicong 已提交
680 681
    req.num = pOffsets->size;
    req.offsets = pOffsets->pData;
L
Liu Jicong 已提交
682
  } else {
L
Liu Jicong 已提交
683 684
    req.num = taosArrayGetSize(&offsets->container);
    req.offsets = (SMqOffset*)offsets->container.pData;
L
Liu Jicong 已提交
685
  }
L
Liu Jicong 已提交
686

H
Hongze Cheng 已提交
687
  SEncoder encoder;
L
Liu Jicong 已提交
688

H
Hongze Cheng 已提交
689
  tEncoderInit(&encoder, NULL, 0);
L
Liu Jicong 已提交
690
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
691
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
692
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
693
  if (buf == NULL) {
H
Hongze Cheng 已提交
694
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
695 696
    return -1;
  }
H
Hongze Cheng 已提交
697
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
698

H
Hongze Cheng 已提交
699
  tEncoderInit(&encoder, buf, tlen);
L
Liu Jicong 已提交
700
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
H
Hongze Cheng 已提交
701
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
702

L
Liu Jicong 已提交
703
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
704 705 706 707
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

L
Liu Jicong 已提交
708
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
709 710 711 712 713
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
714
  pParam->async = async;
L
Liu Jicong 已提交
715
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
716

X
Xiaoyu Wang 已提交
717 718 719 720 721
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
722 723

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
724
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
725 726
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
727 728 729 730 731
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
732 733 734
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
L
Liu Jicong 已提交
735 736
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
737

L
Liu Jicong 已提交
738 739
    if (pOffsets) {
      taosArrayDestroy(pOffsets);
L
Liu Jicong 已提交
740
    }
L
Liu Jicong 已提交
741 742 743
  }

  return resp;
L
Liu Jicong 已提交
744
#endif
L
Liu Jicong 已提交
745 746
}

L
Liu Jicong 已提交
747 748 749 750 751 752
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
753 754

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
755
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
756
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
757
  if (req.topicNames == NULL) goto FAIL;
758

L
Liu Jicong 已提交
759 760
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
761 762

    SName name = {0};
L
Liu Jicong 已提交
763
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
764

L
Liu Jicong 已提交
765 766 767
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
768
    }
L
Liu Jicong 已提交
769
    tNameExtractFullName(&name, topicFName);
770

L
Liu Jicong 已提交
771 772 773
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
774 775
  }

L
Liu Jicong 已提交
776 777 778 779
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

780 781 782
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
783 784
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
  if (sendInfo == NULL) goto FAIL;
785

X
Xiaoyu Wang 已提交
786 787 788 789
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
790

L
Liu Jicong 已提交
791 792 793
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
794 795 796 797
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
798

L
Liu Jicong 已提交
799 800
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
801 802
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
803 804
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

805 806 807 808 809
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
810 811 812
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
813 814
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
815

L
Liu Jicong 已提交
816 817 818
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
819
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
820
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
821 822
    taosMsleep(500);
  }
823

L
Liu Jicong 已提交
824 825 826 827 828 829 830 831
  // init hb timer
  tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);

  // init auto commit timer
  if (tmq->autoCommit) {
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
832 833 834
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
835
  if (code != 0 && buf) {
L
Liu Jicong 已提交
836 837 838
    taosMemoryFree(buf);
  }
  return code;
839 840
}

L
Liu Jicong 已提交
841
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
842
  //
843
  conf->commitCb = cb;
L
Liu Jicong 已提交
844
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
845
}
846

847
#if 0
L
Liu Jicong 已提交
848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
863 864
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
865 866 867 868 869 870 871 872 873 874 875 876
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
877
  int32_t code = 0;
L
Liu Jicong 已提交
878
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
D
stmt  
dapan1121 已提交
879
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
L
Liu Jicong 已提交
880 881 882 883 884 885 886 887 888 889 890 891 892 893
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
894
  strcpy(req.targetStbFullName, tbName);
L
Liu Jicong 已提交
895 896

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
897
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
920
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
921 922 923 924 925 926 927 928 929 930 931
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
932
#endif
L
Liu Jicong 已提交
933

L
Liu Jicong 已提交
934
#if 0
L
Liu Jicong 已提交
935 936
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
937
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
938 939
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
940
#endif
L
Liu Jicong 已提交
941 942

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
943 944
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
945
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
946
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
947
  if (code != 0) {
L
Liu Jicong 已提交
948
    tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
L
fix txn  
Liu Jicong 已提交
949
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
950 951
  }

X
Xiaoyu Wang 已提交
952 953 954
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
955
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
956 957
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
            tmqEpoch);
958
    tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
959 960 961 962
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
963
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
964 965
  }

966
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
967
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
968
    tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
L
fix txn  
Liu Jicong 已提交
969
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
970
  }
L
Liu Jicong 已提交
971

L
Liu Jicong 已提交
972 973 974
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
975

L
Liu Jicong 已提交
976 977
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
978
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
979

L
Liu Jicong 已提交
980 981
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
982

L
Liu Jicong 已提交
983
  taosWriteQitem(tmq->mqueue, pRspWrapper);
984
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
985

L
Liu Jicong 已提交
986
  return 0;
L
fix txn  
Liu Jicong 已提交
987
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
988 989 990
  if (pParam->epoch == tmq->epoch) {
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
991
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
992
  return -1;
993 994
}

L
Liu Jicong 已提交
995
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
996
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
997
  bool    set = false;
L
Liu Jicong 已提交
998 999
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
1000 1001
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1014 1015
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1016
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1017
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1018
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1019

L
Liu Jicong 已提交
1020
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1021 1022 1023 1024 1025 1026
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
1027
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1028 1029 1030 1031
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1032
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1033 1034 1035 1036 1037 1038 1039 1040 1041
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1042
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1043 1044 1045
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
1046
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1047 1048
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
1049
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
1050
      }
L
Liu Jicong 已提交
1051
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1052 1053
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1054
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1055 1056 1057
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1058
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1059 1060 1061 1062
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1063
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1064
  }
L
Liu Jicong 已提交
1065
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1066
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1067
  tmq->clientTopics = newTopics;
1068 1069 1070 1071 1072 1073

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

X
Xiaoyu Wang 已提交
1074 1075 1076 1077
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1078
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1079
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1080
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1081
  pParam->code = code;
1082
  if (code != 0) {
L
Liu Jicong 已提交
1083
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1084
    goto END;
1085
  }
L
Liu Jicong 已提交
1086

L
Liu Jicong 已提交
1087
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1088
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1089
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1090 1091
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1092
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1093 1094
  if (head->epoch <= epoch) {
    goto END;
1095
  }
L
Liu Jicong 已提交
1096

L
Liu Jicong 已提交
1097 1098 1099
  if (!pParam->async) {
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
1100 1101
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
1102
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1103
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1104
  } else {
1105
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1106
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1107
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1108 1109
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1110
    }
L
Liu Jicong 已提交
1111 1112 1113
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1114
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1115

L
Liu Jicong 已提交
1116
    taosWriteQitem(tmq->mqueue, pWrapper);
1117
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1118
    taosMemoryFree(pParam);
1119
  }
L
Liu Jicong 已提交
1120 1121

END:
L
Liu Jicong 已提交
1122
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1123
  if (!pParam->async) {
L
Liu Jicong 已提交
1124 1125 1126
    tsem_post(&pParam->rspSem);
  }
  return code;
1127 1128
}

L
Liu Jicong 已提交
1129
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1130
  int32_t code = 0;
L
Liu Jicong 已提交
1131
#if 0
L
Liu Jicong 已提交
1132
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1133
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1134
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1135
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1136
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1137
  }
L
temp  
Liu Jicong 已提交
1138
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1139
#endif
L
Liu Jicong 已提交
1140
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1141
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1142
  if (req == NULL) {
L
Liu Jicong 已提交
1143
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1144
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1145
    return -1;
L
Liu Jicong 已提交
1146
  }
L
Liu Jicong 已提交
1147 1148 1149
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1150

L
Liu Jicong 已提交
1151
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1152 1153
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1154
    taosMemoryFree(req);
L
Liu Jicong 已提交
1155
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1156
    return -1;
L
Liu Jicong 已提交
1157 1158
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1159
  pParam->async = async;
X
Xiaoyu Wang 已提交
1160
  tsem_init(&pParam->rspSem, 0, 0);
1161

wafwerar's avatar
wafwerar 已提交
1162
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1163 1164
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1165 1166
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1167
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1168 1169 1170 1171 1172 1173 1174 1175 1176 1177
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1178 1179 1180
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1181
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1182

L
Liu Jicong 已提交
1183 1184
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1185 1186
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1187 1188
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1189

L
Liu Jicong 已提交
1190
  if (!async) {
L
Liu Jicong 已提交
1191 1192 1193 1194 1195
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1196 1197
}

L
Liu Jicong 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1212
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1213 1214 1215 1216 1217 1218 1219 1220
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

L
Liu Jicong 已提交
1221
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1233
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1234 1235 1236
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1237

L
Liu Jicong 已提交
1238 1239 1240 1241 1242 1243 1244
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1245

1246
  pReq->withTbName = tmq->withTbName;
1247
  pReq->waitTime = waitTime;
L
Liu Jicong 已提交
1248
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1249
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1250
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1251
  pReq->reqId = generateRequestId();
1252 1253

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1254
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1255 1256 1257
  return pReq;
}

L
Liu Jicong 已提交
1258 1259 1260
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1261
  strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
1262
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1263 1264
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1265

L
Liu Jicong 已提交
1266 1267
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1268 1269 1270
  if (!pWrapper->msg.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1271

L
Liu Jicong 已提交
1272 1273
  taosFreeQitem(pWrapper);
  return pRspObj;
X
Xiaoyu Wang 已提交
1274 1275
}

1276
int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
L
fix  
Liu Jicong 已提交
1277
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1278 1279 1280 1281 1282 1283
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1284
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1285
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1286
        continue;
L
Liu Jicong 已提交
1287
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1288 1289 1290 1291 1292 1293 1294
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1295
      }
L
Liu Jicong 已提交
1296
      atomic_store_32(&pVg->vgSkipCnt, 0);
L
Liu Jicong 已提交
1297
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, waitTime, pTopic, pVg);
X
Xiaoyu Wang 已提交
1298 1299
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1300
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1301 1302
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1303
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1304
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1305
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1306
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1307
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1308 1309
        return -1;
      }
L
Liu Jicong 已提交
1310 1311
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1312
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1313
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1314 1315
      pParam->epoch = tmq->epoch;

wafwerar's avatar
wafwerar 已提交
1316
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1317
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1318 1319
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1320
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1321
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1322 1323 1324 1325
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1326
          .pData = pReq,
L
Liu Jicong 已提交
1327
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1328 1329
          .handle = NULL,
      };
L
Liu Jicong 已提交
1330
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1331
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1332
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1333
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1334
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1335 1336

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1337
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1338 1339
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1340
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1341 1342 1343 1344 1345 1346 1347 1348
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1349 1350
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1351
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1352 1353
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1354
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1355
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1356
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1367
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1368
  while (1) {
L
Liu Jicong 已提交
1369 1370 1371
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1372
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1373 1374
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1375 1376
    }

L
Liu Jicong 已提交
1377 1378
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1379
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
L
fix  
Liu Jicong 已提交
1380
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1381
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1382
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1383
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1384
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1385
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1386
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1387
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1388 1389
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1390 1391
          continue;
        }
L
Liu Jicong 已提交
1392
        // build rsp
L
Liu Jicong 已提交
1393 1394
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        return pRsp;
X
Xiaoyu Wang 已提交
1395
      } else {
L
Liu Jicong 已提交
1396
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1397
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1398 1399
      }
    } else {
L
fix  
Liu Jicong 已提交
1400
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1401
      bool reset = false;
L
Liu Jicong 已提交
1402 1403
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1404
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1405
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1406
        tmqPollImpl(tmq, waitTime);
X
Xiaoyu Wang 已提交
1407 1408 1409 1410 1411
      }
    }
  }
}

1412
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
L
Liu Jicong 已提交
1413 1414
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1415

1416
  rspObj = tmqHandleAllRsp(tmq, wait_time, false);
L
Liu Jicong 已提交
1417 1418
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1419
  }
X
Xiaoyu Wang 已提交
1420

L
Liu Jicong 已提交
1421
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1422
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1423 1424 1425
    return NULL;
  }

X
Xiaoyu Wang 已提交
1426
  while (1) {
L
Liu Jicong 已提交
1427
    tmqHandleAllDelayedTask(tmq);
1428
    tmqPollImpl(tmq, wait_time);
L
Liu Jicong 已提交
1429

1430
    rspObj = tmqHandleAllRsp(tmq, wait_time, false);
L
Liu Jicong 已提交
1431 1432
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1433
    }
1434
    if (wait_time != 0) {
X
Xiaoyu Wang 已提交
1435
      int64_t endTime = taosGetTimestampMs();
1436 1437
      int64_t leftTime = endTime - startTime;
      if (leftTime > wait_time) {
L
Liu Jicong 已提交
1438
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1439 1440
        return NULL;
      }
1441
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1442 1443 1444
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1445 1446 1447 1448
    }
  }
}

L
Liu Jicong 已提交
1449
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
    tmq_list_t*    lst = tmq_list_new();
    tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
    tmq_list_destroy(lst);
    if (rsp == TMQ_RESP_ERR__SUCCESS) {
      // TODO: free resources
      return TMQ_RESP_ERR__SUCCESS;
    } else {
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
1460
  }
1461 1462
  // TODO: free resources
  return TMQ_RESP_ERR__SUCCESS;
1463
}
L
Liu Jicong 已提交
1464 1465 1466 1467 1468 1469 1470

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1471

L
Liu Jicong 已提交
1472
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1473 1474
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1475
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1476 1477 1478 1479 1480 1481 1482 1483 1484 1485 1486 1487 1488
  } else {
    return NULL;
  }
}

int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500 1501

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
    const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
    return name;
  }
  return NULL;
}
L
Liu Jicong 已提交
1502 1503 1504 1505 1506 1507 1508
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
  tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}

DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
  return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}