tmq.c 44.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
52
  SMqOffset offset;
L
Liu Jicong 已提交
53 54 55
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
56
  SArray container;  // SArray<tmq_topic_vgroup_t*>
L
Liu Jicong 已提交
57 58 59
};

struct tmq_conf_t {
60 61 62 63 64 65 66 67 68 69 70
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
71
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
72
  void*          commitCbUserParam;
L
Liu Jicong 已提交
73 74 75
};

struct tmq_t {
L
Liu Jicong 已提交
76
  // conf
L
Liu Jicong 已提交
77 78
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
79
  int8_t         withTbName;
L
Liu Jicong 已提交
80
  int8_t         autoCommit;
L
Liu Jicong 已提交
81
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
82
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
83
  int64_t        consumerId;
L
Liu Jicong 已提交
84 85
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
86 87 88 89

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
90 91
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
92
  int32_t epSkipCnt;
L
Liu Jicong 已提交
93
#endif
L
Liu Jicong 已提交
94 95
  int64_t pollCnt;

L
Liu Jicong 已提交
96 97 98 99 100
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
101 102 103 104
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
105
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
106
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
107
  STaosQall*  qall;
L
Liu Jicong 已提交
108 109 110 111
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
112 113
};

X
Xiaoyu Wang 已提交
114 115 116 117 118 119 120 121
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
122
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
123 124
};

L
Liu Jicong 已提交
125 126 127 128 129 130
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
131
typedef struct {
132 133 134 135
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
136
  // connection info
137
  int32_t vgId;
X
Xiaoyu Wang 已提交
138
  int32_t vgStatus;
L
Liu Jicong 已提交
139
  int32_t vgSkipCnt;
140 141 142
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
143
typedef struct {
144
  // subscribe info
L
Liu Jicong 已提交
145
  char* topicName;
L
Liu Jicong 已提交
146
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
147 148 149

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
150 151
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
152 153
} SMqClientTopic;

L
Liu Jicong 已提交
154 155 156 157 158
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
159
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
160 161
} SMqPollRspWrapper;

L
Liu Jicong 已提交
162
typedef struct {
L
Liu Jicong 已提交
163 164 165 166
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
167

L
Liu Jicong 已提交
168
typedef struct {
169
  tmq_t*  tmq;
L
Liu Jicong 已提交
170
  int32_t code;
L
Liu Jicong 已提交
171
  int32_t async;
X
Xiaoyu Wang 已提交
172
  tsem_t  rspSem;
173 174
} SMqAskEpCbParam;

L
Liu Jicong 已提交
175
typedef struct {
L
Liu Jicong 已提交
176 177
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
178
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
179
  int32_t         epoch;
L
Liu Jicong 已提交
180
  int32_t         vgId;
L
Liu Jicong 已提交
181
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
182
} SMqPollCbParam;
183

L
Liu Jicong 已提交
184
typedef struct {
L
Liu Jicong 已提交
185
  tmq_t*         tmq;
L
Liu Jicong 已提交
186 187
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
188
  int8_t         freeOffsets;
L
Liu Jicong 已提交
189
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
190 191
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
192
  SArray*        offsets;
L
Liu Jicong 已提交
193
  void*          userParam;
L
Liu Jicong 已提交
194
} SMqCommitCbParam;
L
Liu Jicong 已提交
195

196
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
197
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
198
  conf->withTbName = false;
L
Liu Jicong 已提交
199
  conf->autoCommit = true;
L
Liu Jicong 已提交
200
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
201
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
202 203 204
  return conf;
}

L
Liu Jicong 已提交
205
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
206 207 208 209 210 211
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
212 213 214
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
215 216
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
217
    return TMQ_CONF_OK;
218
  }
L
Liu Jicong 已提交
219

220 221
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
222 223
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
224

L
Liu Jicong 已提交
225 226
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
227
      conf->autoCommit = true;
L
Liu Jicong 已提交
228 229
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
230
      conf->autoCommit = false;
L
Liu Jicong 已提交
231 232 233 234
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
235
  }
L
Liu Jicong 已提交
236

L
Liu Jicong 已提交
237 238 239 240 241
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
242 243 244 245 246 247 248 249 250 251 252 253 254 255
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
256

257 258
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
259
      conf->withTbName = true;
L
Liu Jicong 已提交
260
      return TMQ_CONF_OK;
261
    } else if (strcmp(value, "false") == 0) {
262
      conf->withTbName = false;
L
Liu Jicong 已提交
263
      return TMQ_CONF_OK;
264 265 266 267 268
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
269
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
270 271 272
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
273
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
274 275 276
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
277
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
278 279 280
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
281
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
282 283 284
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
285
  if (strcmp(key, "td.connect.db") == 0) {
286
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
287 288 289
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
290
  return TMQ_CONF_UNKNOWN;
291 292 293
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
294 295
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
296 297
}

L
Liu Jicong 已提交
298 299 300
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
301
  if (taosArrayPush(container, &topic) == NULL) return -1;
302 303 304
  return 0;
}

L
Liu Jicong 已提交
305
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
306
  SArray* container = &list->container;
L
Liu Jicong 已提交
307
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
308 309
}

L
Liu Jicong 已提交
310 311 312 313 314 315 316 317 318 319
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
320 321 322 323
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
324 325 326 327 328 329 330 331 332 333 334
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets,
                            pParam->tmq->commitCbUserParam);
    } else if (!pParam->automatic && pParam->userCb) {
      pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam);
    }

L
Liu Jicong 已提交
335
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}

int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async,
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
353 354
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
355 356

  if (offsets == NULL) {
L
Liu Jicong 已提交
357
    freeOffsets = 1;
L
Liu Jicong 已提交
358 359 360 361 362 363 364 365 366 367 368 369 370 371
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
372
    freeOffsets = 0;
L
Liu Jicong 已提交
373 374 375 376 377 378
    pOffsets = (SArray*)&offsets->container;
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
379 380 381 382
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
383 384 385
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
386
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
387 388
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
389
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
390 391
    goto END;
  }
L
Liu Jicong 已提交
392 393
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
394 395 396 397 398 399 400 401 402 403 404 405
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
406
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

  sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
  }

  // avoid double free if msg is sent
  buf = NULL;

  code = 0;
END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
443 444
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
445 446 447 448 449 450 451 452 453

  if (code != 0 && async) {
    if (automatic) {
      tmq->commitCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
    } else {
      userCb(tmq, TMQ_RESP_ERR__FAIL, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
    }
  }

L
Liu Jicong 已提交
454
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
455 456 457 458 459
    taosArrayDestroy(pOffsets);
  }
  return code;
}

L
Liu Jicong 已提交
460 461
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
462
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
463 464
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
465
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
466 467 468 469
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
470
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
471 472
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
473
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
474 475 476 477
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
478
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
479 480
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
481
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
482 483 484 485 486 487 488 489 490 491 492
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
493
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
494 495
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
496 497
      /*tmq_commit(tmq, NULL, true);*/
      tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
498 499 500 501 502
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
503
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
504 505 506 507 508
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
509
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
510
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
511 512 513 514 515 516 517 518
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
519
  msg = NULL;
L
Liu Jicong 已提交
520 521 522 523 524 525 526 527 528 529
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
530 531 532
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
533
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
534 535 536
  tsem_post(&pParam->rspSem);
  return 0;
}
537

X
Xiaoyu Wang 已提交
538 539 540 541 542
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
543
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
544
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
545 546 547 548 549
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
L
Liu Jicong 已提交
550 551 552 553
  tmq_list_t*    lst = tmq_list_new();
  tmq_resp_err_t rsp = tmq_subscribe(tmq, lst);
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
554 555
}

L
Liu Jicong 已提交
556
#if 0
557
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
558
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
559 560 561 562 563 564
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
565
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
566
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
567
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
568
  // set conf
569 570
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
571
  pTmq->autoCommit = conf->autoCommit;
572
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
573
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
574

L
Liu Jicong 已提交
575 576 577 578 579 580 581 582 583 584
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
585 586
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
587 588
  return pTmq;
}
L
Liu Jicong 已提交
589
#endif
L
Liu Jicong 已提交
590

L
Liu Jicong 已提交
591
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
592 593 594 595 596 597 598 599 600 601 602
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
603 604 605 606
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
607

L
Liu Jicong 已提交
608 609 610 611 612
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
613
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
614

L
Liu Jicong 已提交
615 616 617 618 619 620 621 622
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
623

L
Liu Jicong 已提交
624 625
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
626 627
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
628 629
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
630

L
Liu Jicong 已提交
631 632 633
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
634
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
635
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
636
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
637 638
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
639 640
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
641
  // assign consumerId
L
Liu Jicong 已提交
642
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
643

L
Liu Jicong 已提交
644 645 646 647
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
648

L
Liu Jicong 已提交
649 650 651 652 653 654
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
655

656
  return pTmq;
L
Liu Jicong 已提交
657 658 659 660 661 662 663 664

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
665 666
}

L
Liu Jicong 已提交
667
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
668 669
  return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
#if 0
L
Liu Jicong 已提交
670 671 672
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
673 674
  // build msg
  // send to mnode
L
Liu Jicong 已提交
675
  SMqCMCommitOffsetReq req;
L
Liu Jicong 已提交
676
  SArray*              pOffsets = NULL;
L
Liu Jicong 已提交
677 678

  if (offsets == NULL) {
L
Liu Jicong 已提交
679
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
L
Liu Jicong 已提交
680 681 682 683 684 685 686 687 688
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
L
Liu Jicong 已提交
689
        taosArrayPush(pOffsets, &offset);
L
Liu Jicong 已提交
690 691
      }
    }
L
Liu Jicong 已提交
692 693
    req.num = pOffsets->size;
    req.offsets = pOffsets->pData;
L
Liu Jicong 已提交
694
  } else {
L
Liu Jicong 已提交
695 696
    req.num = taosArrayGetSize(&offsets->container);
    req.offsets = (SMqOffset*)offsets->container.pData;
L
Liu Jicong 已提交
697
  }
L
Liu Jicong 已提交
698

H
Hongze Cheng 已提交
699
  SEncoder encoder;
L
Liu Jicong 已提交
700

H
Hongze Cheng 已提交
701
  tEncoderInit(&encoder, NULL, 0);
L
Liu Jicong 已提交
702
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
703
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
704
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
705
  if (buf == NULL) {
H
Hongze Cheng 已提交
706
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
707 708
    return -1;
  }
H
Hongze Cheng 已提交
709
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
710

H
Hongze Cheng 已提交
711
  tEncoderInit(&encoder, buf, tlen);
L
Liu Jicong 已提交
712
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
H
Hongze Cheng 已提交
713
  tEncoderClear(&encoder);
L
Liu Jicong 已提交
714

L
Liu Jicong 已提交
715
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
716 717 718 719
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

L
Liu Jicong 已提交
720
  SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
721 722 723 724 725
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
726
  pParam->async = async;
L
Liu Jicong 已提交
727
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
728

X
Xiaoyu Wang 已提交
729 730 731 732 733
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
734 735

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
736
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
737 738
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
739 740 741 742 743
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
744 745 746
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
L
Liu Jicong 已提交
747 748
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
749

L
Liu Jicong 已提交
750 751
    if (pOffsets) {
      taosArrayDestroy(pOffsets);
L
Liu Jicong 已提交
752
    }
L
Liu Jicong 已提交
753 754 755
  }

  return resp;
L
Liu Jicong 已提交
756
#endif
L
Liu Jicong 已提交
757 758
}

L
Liu Jicong 已提交
759 760 761 762 763 764
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
765 766

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
767
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
768
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
769
  if (req.topicNames == NULL) goto FAIL;
770

L
Liu Jicong 已提交
771 772
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
773 774

    SName name = {0};
L
Liu Jicong 已提交
775
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
776

L
Liu Jicong 已提交
777 778 779
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
780
    }
L
Liu Jicong 已提交
781
    tNameExtractFullName(&name, topicFName);
782

L
Liu Jicong 已提交
783 784 785
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
786 787
  }

L
Liu Jicong 已提交
788 789 790 791
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

792 793 794
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

L
Liu Jicong 已提交
795 796
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
  if (sendInfo == NULL) goto FAIL;
797

X
Xiaoyu Wang 已提交
798 799 800 801
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
802

L
Liu Jicong 已提交
803 804 805
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
806 807 808 809
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
810

L
Liu Jicong 已提交
811 812
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
813 814
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
815 816
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

817 818 819 820 821
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
822 823 824
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
825 826
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
827

L
Liu Jicong 已提交
828 829 830
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
831
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
832
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
833 834
    taosMsleep(500);
  }
835

L
Liu Jicong 已提交
836
  // init hb timer
837 838 839
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
840 841

  // init auto commit timer
842
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
843 844 845
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
846 847 848
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
849
  if (code != 0 && buf) {
L
Liu Jicong 已提交
850 851 852
    taosMemoryFree(buf);
  }
  return code;
853 854
}

L
Liu Jicong 已提交
855
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
856
  //
857
  conf->commitCb = cb;
L
Liu Jicong 已提交
858
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
859
}
860

861
#if 0
L
Liu Jicong 已提交
862 863 864 865 866 867 868 869 870 871 872 873 874 875 876
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
877 878
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
879 880 881 882 883 884 885 886 887 888 889 890
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
891
  int32_t code = 0;
L
Liu Jicong 已提交
892
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
D
stmt  
dapan1121 已提交
893
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
L
Liu Jicong 已提交
894 895 896 897 898 899 900 901 902 903 904 905 906 907
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
908
  strcpy(req.targetStbFullName, tbName);
L
Liu Jicong 已提交
909 910

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
911
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
934
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
935 936 937 938 939 940 941 942 943 944 945
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
946
#endif
L
Liu Jicong 已提交
947

L
Liu Jicong 已提交
948
#if 0
L
Liu Jicong 已提交
949 950
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
951
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
952 953
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
954
#endif
L
Liu Jicong 已提交
955 956

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
957 958
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
959
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
960
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
961 962 963
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
964
  if (code != 0) {
L
Liu Jicong 已提交
965 966
    tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
fix txn  
Liu Jicong 已提交
967
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
968 969
  }

X
Xiaoyu Wang 已提交
970 971 972
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
973
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
974
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
975
            tmqEpoch);
976
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
977
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
978 979 980 981
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
982
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
983 984
  }

985
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
986
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
987 988
    taosMemoryFree(pMsg->pData);
    tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
989
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
990
  }
L
Liu Jicong 已提交
991

L
Liu Jicong 已提交
992 993 994
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
995

L
Liu Jicong 已提交
996 997
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
998
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
999
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1000

L
Liu Jicong 已提交
1001 1002
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
1003

L
Liu Jicong 已提交
1004
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1005
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1006

L
Liu Jicong 已提交
1007
  return 0;
L
fix txn  
Liu Jicong 已提交
1008
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1009
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1010 1011
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1012
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1013
  return -1;
1014 1015
}

L
Liu Jicong 已提交
1016
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1017
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1018
  bool    set = false;
L
Liu Jicong 已提交
1019 1020
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
1021 1022
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1035 1036
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1037
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1038
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1039
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1040
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1041

L
Liu Jicong 已提交
1042
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1043 1044 1045 1046 1047 1048
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
1049
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1050 1051 1052 1053
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1054
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1055 1056 1057 1058 1059 1060 1061 1062 1063
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1064
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1065 1066 1067
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
1068
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1069 1070
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
1071
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
1072
      }
L
Liu Jicong 已提交
1073
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1074 1075
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1076
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1077 1078 1079
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1080
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1081 1082 1083 1084
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1085
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1086
  }
L
Liu Jicong 已提交
1087
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1088
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1089
  tmq->clientTopics = newTopics;
1090

1091 1092 1093 1094
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1095

X
Xiaoyu Wang 已提交
1096 1097 1098 1099
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1100
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1101
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1102
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1103
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1104
  pParam->code = code;
1105
  if (code != 0) {
L
Liu Jicong 已提交
1106
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1107
    goto END;
1108
  }
L
Liu Jicong 已提交
1109

L
Liu Jicong 已提交
1110
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1111
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1112
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1113 1114
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1115
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1116 1117
  if (head->epoch <= epoch) {
    goto END;
1118
  }
L
Liu Jicong 已提交
1119

L
Liu Jicong 已提交
1120
  if (!async) {
L
Liu Jicong 已提交
1121 1122
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
1123 1124
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
1125
    tmqUpdateEp(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1126
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1127
  } else {
1128
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1129
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1130
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1131 1132
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1133
    }
L
Liu Jicong 已提交
1134 1135 1136
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1137
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1138

L
Liu Jicong 已提交
1139
    taosWriteQitem(tmq->mqueue, pWrapper);
1140
    tsem_post(&tmq->rspSem);
1141
  }
L
Liu Jicong 已提交
1142 1143

END:
L
Liu Jicong 已提交
1144
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1145
  if (!async) {
L
Liu Jicong 已提交
1146
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1147 1148
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1149 1150
  }
  return code;
1151 1152
}

L
Liu Jicong 已提交
1153
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1154
  int32_t code = 0;
L
Liu Jicong 已提交
1155
#if 0
L
Liu Jicong 已提交
1156
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1157
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1158
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1159
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1160
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1161
  }
L
temp  
Liu Jicong 已提交
1162
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1163
#endif
L
Liu Jicong 已提交
1164
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1165
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1166
  if (req == NULL) {
L
Liu Jicong 已提交
1167
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1168
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1169
    return -1;
L
Liu Jicong 已提交
1170
  }
L
Liu Jicong 已提交
1171 1172 1173
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1174

L
Liu Jicong 已提交
1175
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1176 1177
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1178
    taosMemoryFree(req);
L
Liu Jicong 已提交
1179
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1180
    return -1;
L
Liu Jicong 已提交
1181 1182
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1183
  pParam->async = async;
X
Xiaoyu Wang 已提交
1184
  tsem_init(&pParam->rspSem, 0, 0);
1185

wafwerar's avatar
wafwerar 已提交
1186
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1187 1188
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1189 1190
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1191
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1202 1203 1204
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1205
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1206

L
Liu Jicong 已提交
1207 1208
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1209 1210
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1211 1212
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1213

L
Liu Jicong 已提交
1214
  if (!async) {
L
Liu Jicong 已提交
1215 1216 1217 1218 1219
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1220 1221
}

L
Liu Jicong 已提交
1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1236
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1237 1238 1239 1240 1241 1242 1243 1244
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

1245
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1257
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1258 1259 1260
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1261

L
Liu Jicong 已提交
1262 1263 1264 1265 1266 1267 1268
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1269

1270
  pReq->withTbName = tmq->withTbName;
1271
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1272
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1273
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1274
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1275
  pReq->reqId = generateRequestId();
1276 1277

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1278
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1279 1280 1281
  return pReq;
}

L
Liu Jicong 已提交
1282 1283 1284
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1285 1286
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1287
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1288 1289
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1290

L
Liu Jicong 已提交
1291 1292
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1293 1294 1295
  if (!pWrapper->msg.withSchema) {
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1296

L
Liu Jicong 已提交
1297
  return pRspObj;
X
Xiaoyu Wang 已提交
1298 1299
}

1300
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
fix  
Liu Jicong 已提交
1301
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1302 1303 1304 1305 1306 1307
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1308
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1309
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1310
        continue;
L
Liu Jicong 已提交
1311
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1312 1313 1314 1315 1316 1317 1318
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1319
      }
L
Liu Jicong 已提交
1320
      atomic_store_32(&pVg->vgSkipCnt, 0);
1321
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1322 1323
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1324
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1325 1326
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1327
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1328
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1329
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1330
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1331
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1332 1333
        return -1;
      }
L
Liu Jicong 已提交
1334 1335
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1336
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1337
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1338 1339
      pParam->epoch = tmq->epoch;

wafwerar's avatar
wafwerar 已提交
1340
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1341
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1342 1343
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1344
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1345
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1346 1347 1348 1349
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1350
          .pData = pReq,
L
Liu Jicong 已提交
1351
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1352 1353
          .handle = NULL,
      };
L
Liu Jicong 已提交
1354
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1355
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1356
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1357
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1358
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1359 1360

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1361
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1362 1363
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1364
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1365 1366 1367 1368 1369 1370 1371 1372
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1373 1374
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1375
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1376 1377
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1378
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1379
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1380
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1391
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1392
  while (1) {
L
Liu Jicong 已提交
1393 1394 1395
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1396
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1397 1398
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1399 1400
    }

L
Liu Jicong 已提交
1401 1402
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1403
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
L
fix  
Liu Jicong 已提交
1404
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1405
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1406
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1407
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1408
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1409
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1410
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1411
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1412 1413
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1414 1415
          continue;
        }
L
Liu Jicong 已提交
1416
        // build rsp
L
Liu Jicong 已提交
1417
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1418
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1419
        return pRsp;
X
Xiaoyu Wang 已提交
1420
      } else {
L
Liu Jicong 已提交
1421
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1422
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1423 1424
      }
    } else {
L
fix  
Liu Jicong 已提交
1425
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1426
      bool reset = false;
L
Liu Jicong 已提交
1427 1428
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1429
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1430
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1431
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1432 1433 1434 1435 1436
      }
    }
  }
}

1437
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1438 1439
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1440

1441
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1442 1443
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1444
  }
X
Xiaoyu Wang 已提交
1445

L
Liu Jicong 已提交
1446
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1447
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1448 1449 1450
    return NULL;
  }

X
Xiaoyu Wang 已提交
1451
  while (1) {
L
Liu Jicong 已提交
1452
    tmqHandleAllDelayedTask(tmq);
1453
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1454

1455
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1456 1457
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1458
    }
1459
    if (timeout != 0) {
X
Xiaoyu Wang 已提交
1460
      int64_t endTime = taosGetTimestampMs();
1461
      int64_t leftTime = endTime - startTime;
1462
      if (leftTime > timeout) {
L
Liu Jicong 已提交
1463
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1464 1465
        return NULL;
      }
1466
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1467 1468 1469
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1470 1471 1472 1473
    }
  }
}

L
Liu Jicong 已提交
1474
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
1475
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
1476
    tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL);
1477
    if (rsp == TMQ_RESP_ERR__FAIL) {
1478 1479 1480 1481 1482
      return TMQ_RESP_ERR__FAIL;
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1483
    tmq_list_destroy(lst);
1484

1485
    if (rsp == TMQ_RESP_ERR__FAIL) {
1486 1487
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
1488
  }
1489 1490
  // TODO: free resources
  return TMQ_RESP_ERR__SUCCESS;
1491
}
L
Liu Jicong 已提交
1492 1493 1494 1495 1496 1497 1498

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1499

L
Liu Jicong 已提交
1500
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1501 1502
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1503
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1504 1505 1506 1507 1508
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1509 1510 1511 1512 1513 1514 1515 1516 1517
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1518 1519 1520 1521 1522 1523 1524 1525
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537 1538

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
    const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
    return name;
  }
  return NULL;
}
L
Liu Jicong 已提交
1539 1540 1541 1542 1543 1544 1545
DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) {
  tmqCommitInner(tmq, offsets, 0, 1, cb, param);
}

DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
  return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL);
}