tmq.c 38.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25 26
#include "tref.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34 35 36 37
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
  int8_t           tmqRspType;
  int32_t          epoch;
  SMqCMGetSubEpRsp msg;
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
38
struct tmq_list_t {
L
Liu Jicong 已提交
39
  SArray container;
L
Liu Jicong 已提交
40
};
L
Liu Jicong 已提交
41

L
Liu Jicong 已提交
42
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
43
  SMqOffset offset;
L
Liu Jicong 已提交
44 45 46
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
47
  SArray container;  // SArray<tmq_topic_vgroup_t*>
L
Liu Jicong 已提交
48 49 50
};

struct tmq_conf_t {
L
Liu Jicong 已提交
51
  char           clientId[256];
L
Liu Jicong 已提交
52
  char           groupId[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
53
  int8_t         autoCommit;
L
Liu Jicong 已提交
54
  int8_t         resetOffset;
L
Liu Jicong 已提交
55
  uint16_t       port;
L
Liu Jicong 已提交
56
  uint16_t       autoCommitInterval;
L
Liu Jicong 已提交
57 58 59 60
  char*          ip;
  char*          user;
  char*          pass;
  char*          db;
L
Liu Jicong 已提交
61
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
62 63 64
};

struct tmq_t {
L
Liu Jicong 已提交
65
  // conf
L
Liu Jicong 已提交
66 67 68 69
  char   groupId[TSDB_CGROUP_LEN];
  char   clientId[256];
  int8_t autoCommit;
  /*int8_t         inWaiting;*/
L
Liu Jicong 已提交
70
  int64_t        consumerId;
L
Liu Jicong 已提交
71
  int32_t        epoch;
L
Liu Jicong 已提交
72
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
73 74 75
  int64_t        status;
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
76 77 78 79 80 81 82 83 84
  /*int32_t        nextTopicIdx;*/
  int8_t  epStatus;
  int32_t epSkipCnt;
  /*int32_t        waitingRequest;*/
  /*int32_t        readyRequest;*/
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of tmq_message_t
  STaosQall*  qall;
  tsem_t      rspSem;
L
Liu Jicong 已提交
85 86
  // stat
  int64_t pollCnt;
L
Liu Jicong 已提交
87 88
};

X
Xiaoyu Wang 已提交
89 90 91 92 93 94 95 96
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
L
Liu Jicong 已提交
97 98
};

L
Liu Jicong 已提交
99
typedef struct {
100 101 102 103
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
104
  // connection info
105
  int32_t vgId;
X
Xiaoyu Wang 已提交
106
  int32_t vgStatus;
L
Liu Jicong 已提交
107
  int32_t vgSkipCnt;
108 109 110
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
111
typedef struct {
112
  // subscribe info
L
Liu Jicong 已提交
113 114 115 116 117 118 119 120
  int32_t        sqlLen;
  char*          sql;
  char*          topicName;
  int64_t        topicId;
  SArray*        vgs;  // SArray<SMqClientVg>
  int8_t         isSchemaAdaptive;
  int32_t        numOfFields;
  SSchemaWrapper schema;
121 122
} SMqClientTopic;

L
Liu Jicong 已提交
123 124 125 126 127
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
128
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
129 130
} SMqPollRspWrapper;

L
Liu Jicong 已提交
131
typedef struct {
L
Liu Jicong 已提交
132 133 134 135
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
136

L
Liu Jicong 已提交
137
typedef struct {
138
  tmq_t*  tmq;
L
Liu Jicong 已提交
139
  int32_t code;
X
Xiaoyu Wang 已提交
140 141
  int32_t sync;
  tsem_t  rspSem;
142 143
} SMqAskEpCbParam;

L
Liu Jicong 已提交
144
typedef struct {
L
Liu Jicong 已提交
145 146
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
147
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
148
  int32_t         epoch;
L
Liu Jicong 已提交
149
  int32_t         vgId;
L
Liu Jicong 已提交
150 151
  tsem_t          rspSem;
  int32_t         sync;
X
Xiaoyu Wang 已提交
152
} SMqPollCbParam;
153

L
Liu Jicong 已提交
154
typedef struct {
L
Liu Jicong 已提交
155
  tmq_t*         tmq;
L
Liu Jicong 已提交
156
  int32_t        async;
L
Liu Jicong 已提交
157 158
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
159
  /*SMqClientVg* pVg;*/
L
Liu Jicong 已提交
160
} SMqCommitCbParam;
L
Liu Jicong 已提交
161

162
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
163
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
L
Liu Jicong 已提交
164
  conf->autoCommit = false;
X
Xiaoyu Wang 已提交
165
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
166 167 168
  return conf;
}

L
Liu Jicong 已提交
169
void tmq_conf_destroy(tmq_conf_t* conf) {
wafwerar's avatar
wafwerar 已提交
170
  if (conf) taosMemoryFree(conf);
L
Liu Jicong 已提交
171 172 173
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
174 175
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
176
    return TMQ_CONF_OK;
177
  }
L
Liu Jicong 已提交
178

179 180
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
181 182
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
183

L
Liu Jicong 已提交
184 185
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
186
      conf->autoCommit = true;
L
Liu Jicong 已提交
187 188
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
189
      conf->autoCommit = false;
L
Liu Jicong 已提交
190 191 192 193
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
194
  }
L
Liu Jicong 已提交
195

L
Liu Jicong 已提交
196 197 198 199 200
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
201 202 203 204 205 206 207 208 209 210 211 212 213 214
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
215

L
Liu Jicong 已提交
216
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
217 218 219
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
220
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
221 222 223
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
224
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
225 226 227
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
228
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
229 230 231
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
232
  if (strcmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
233 234 235 236
    conf->db = strdup(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
237
  return TMQ_CONF_UNKNOWN;
238 239 240
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
241 242
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
243 244
}

L
Liu Jicong 已提交
245 246 247
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
248
  if (taosArrayPush(container, &topic) == NULL) return -1;
249 250 251
  return 0;
}

L
Liu Jicong 已提交
252
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
253
  SArray* container = &list->container;
L
fix  
Liu Jicong 已提交
254
  /*taosArrayDestroy(container);*/
255 256 257 258 259 260
  int32_t sz = taosArrayGetSize(container);
  for (int32_t i = 0; i < sz; i++) {
    char* str = taosArrayGetP(container, i);
    taosMemoryFree(str);
  }
  taosArrayDestroy(container);
L
Liu Jicong 已提交
261 262
}

L
Liu Jicong 已提交
263 264 265 266
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
267
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
268
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
269 270 271 272 273 274 275 276
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
277
  msg = NULL;
L
Liu Jicong 已提交
278 279 280 281 282 283 284 285 286 287
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
288 289 290 291 292 293
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
294

L
Liu Jicong 已提交
295
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
296
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
297
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
L
Liu Jicong 已提交
298
  if (pParam->tmq->commit_cb) {
L
Liu Jicong 已提交
299
    pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL);
L
Liu Jicong 已提交
300
  }
L
fix  
Liu Jicong 已提交
301
  if (!pParam->async) tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
302 303 304
  return 0;
}

X
Xiaoyu Wang 已提交
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
    tmq_list_append(*topics, strdup(topic->topicName));
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  return tmq_subscribe(tmq, lst);
}

L
Liu Jicong 已提交
321
#if 0
322
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
323
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
324 325 326 327
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
L
Liu Jicong 已提交
328
  /*pTmq->inWaiting = 0;*/
329 330
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
331
  pTmq->epoch = 0;
L
Liu Jicong 已提交
332 333
  /*pTmq->waitingRequest = 0;*/
  /*pTmq->readyRequest = 0;*/
L
fix txn  
Liu Jicong 已提交
334
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
335
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
336
  // set conf
337 338
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
339
  pTmq->autoCommit = conf->autoCommit;
340
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
341
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
342

L
Liu Jicong 已提交
343 344 345 346 347 348 349 350 351 352
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
353 354
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
355 356
  return pTmq;
}
L
Liu Jicong 已提交
357
#endif
L
Liu Jicong 已提交
358

L
Liu Jicong 已提交
359
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
360 361 362 363
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
364 365 366 367 368 369
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
  ASSERT(conf->db);
L
Liu Jicong 已提交
370
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
371 372 373

  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) return NULL;
L
Liu Jicong 已提交
374

L
Liu Jicong 已提交
375
  /*pTmq->inWaiting = 0;*/
L
Liu Jicong 已提交
376 377 378
  pTmq->status = 0;
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
379 380
  /*pTmq->waitingRequest = 0;*/
  /*pTmq->readyRequest = 0;*/
L
temp  
Liu Jicong 已提交
381 382
  pTmq->epStatus = 0;
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
383 384 385 386 387 388 389
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
  pTmq->autoCommit = conf->autoCommit;
  pTmq->commit_cb = conf->commit_cb;
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
390
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
391
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
L
Liu Jicong 已提交
392 393 394 395
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }
X
Xiaoyu Wang 已提交
396 397 398

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
399 400 401

  tsem_init(&pTmq->rspSem, 0, 0);

402 403 404
  return pTmq;
}

L
Liu Jicong 已提交
405 406 407 408
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
409 410
  // build msg
  // send to mnode
L
Liu Jicong 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
  SMqCMCommitOffsetReq req;
  SArray*              pArray = NULL;

  if (offsets == NULL) {
    pArray = taosArrayInit(0, sizeof(SMqOffset));
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pArray, &offset);
      }
    }
    req.num = pArray->size;
    req.offsets = pArray->pData;
  } else {
L
Liu Jicong 已提交
431 432
    req.num = taosArrayGetSize(&offsets->container);
    req.offsets = (SMqOffset*)offsets->container.pData;
L
Liu Jicong 已提交
433
  }
L
Liu Jicong 已提交
434 435 436 437

  SCoder encoder;

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
L
Liu Jicong 已提交
438
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
439
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
440
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
441 442 443 444 445 446 447
  if (buf == NULL) {
    tCoderClear(&encoder);
    return -1;
  }
  tCoderClear(&encoder);

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
L
Liu Jicong 已提交
448
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
449 450
  tCoderClear(&encoder);

L
Liu Jicong 已提交
451
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
452 453 454 455
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

wafwerar's avatar
wafwerar 已提交
456
  SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
457 458 459 460 461
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
462
  pParam->async = async;
L
Liu Jicong 已提交
463

X
Xiaoyu Wang 已提交
464 465 466 467 468
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
469 470

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
471 472
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
473 474 475 476 477
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
478 479 480 481
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
  }
L
Liu Jicong 已提交
482

L
fix  
Liu Jicong 已提交
483
  tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
484
  taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
485

L
Liu Jicong 已提交
486 487 488 489 490
  if (pArray) {
    taosArrayDestroy(pArray);
  }

  return resp;
L
Liu Jicong 已提交
491 492
}

L
Liu Jicong 已提交
493
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
L
Liu Jicong 已提交
494
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
495 496
  SArray*      container = &topic_list->container;
  int32_t      sz = taosArrayGetSize(container);
L
Liu Jicong 已提交
497
  // destroy ex
498 499 500 501 502 503
  taosArrayDestroy(tmq->clientTopics);
  tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));

  SCMSubscribeReq req;
  req.topicNum = sz;
  req.consumerId = tmq->consumerId;
504
  strcpy(req.cgroup, tmq->groupId);
505 506 507
  req.topicNames = taosArrayInit(sz, sizeof(void*));

  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
508 509
    /*char* topicName = topic_list->elems[i];*/
    char* topicName = taosArrayGetP(container, i);
510 511 512

    SName name = {0};
    char* dbName = getDbOfConnection(tmq->pTscObj);
L
Liu Jicong 已提交
513 514 515
    if (dbName == NULL) {
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
516
    tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
517 518
    tNameFromString(&name, topicName, T_NAME_TABLE);

wafwerar's avatar
wafwerar 已提交
519
    char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
520
    if (topicFname == NULL) {
L
Liu Jicong 已提交
521
      goto _return;
522 523 524 525
    }
    tNameExtractFullName(&name, topicFname);
    tscDebug("subscribe topic: %s", topicFname);
    SMqClientTopic topic = {
L
Liu Jicong 已提交
526 527 528 529 530 531
        .sql = NULL,
        .sqlLen = 0,
        .topicId = 0,
        .topicName = topicFname,
        .vgs = NULL,
    };
532
    topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
L
Liu Jicong 已提交
533
    taosArrayPush(tmq->clientTopics, &topic);
534
    taosArrayPush(req.topicNames, &topicFname);
wafwerar's avatar
wafwerar 已提交
535
    taosMemoryFree(dbName);
536 537
  }

L
Liu Jicong 已提交
538
  int   tlen = tSerializeSCMSubscribeReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
539
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
540
  if (buf == NULL) {
541 542 543 544 545 546 547 548 549
    goto _return;
  }

  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
  if (pRequest == NULL) {
L
Liu Jicong 已提交
550
    tscError("failed to malloc request");
551 552
  }

X
Xiaoyu Wang 已提交
553 554 555 556
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
557 558
  tsem_init(&param.rspSem, 0, 0);

X
Xiaoyu Wang 已提交
559 560 561 562 563
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
564 565

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
566 567
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
568 569 570 571 572
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
573 574
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
575 576 577

_return:
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
578
  /*destroySendMsgInfo(sendInfo);*/
579 580
  /*}*/

L
Liu Jicong 已提交
581
  return param.rspErr;
582 583
}

L
Liu Jicong 已提交
584
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
585

L
Liu Jicong 已提交
586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
601 602
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
603 604 605 606 607 608 609 610 611 612 613 614
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
615
  int32_t code = 0;
L
Liu Jicong 已提交
616
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
D
stmt  
dapan1121 已提交
617
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return);
L
Liu Jicong 已提交
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634

  // todo check for invalid sql statement and return with error code

  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
635
  strcpy(req.outputSTbName, tbName);
L
Liu Jicong 已提交
636 637

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
638
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
662
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
663 664 665 666 667 668 669 670 671 672 673 674
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}

L
Liu Jicong 已提交
675
#if 0
L
Liu Jicong 已提交
676 677 678
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
679
  SQuery*      pQueryNode = NULL;
L
Liu Jicong 已提交
680
  char*        astStr = NULL;
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

L
Liu Jicong 已提交
701
  tscDebug("start to create topic: %s", topicName);
702

X
Xiaoyu Wang 已提交
703
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
704 705
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
706 707 708

  // todo check for invalid sql statement and return with error code

L
Liu Jicong 已提交
709
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
710

L
Liu Jicong 已提交
711
  /*printf("%s\n", pStr);*/
712

L
Liu Jicong 已提交
713
  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
X
Xiaoyu Wang 已提交
714 715
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, topicName);
716

L
Liu Jicong 已提交
717
  SCMCreateTopicReq req = {
L
Liu Jicong 已提交
718
      .igExists = 1,
L
Liu Jicong 已提交
719
      .ast = astStr,
L
Liu Jicong 已提交
720
      .sql = (char*)sql,
721
  };
L
Liu Jicong 已提交
722
  tNameExtractFullName(&name, req.name);
723

L
Liu Jicong 已提交
724
  int   tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
725
  void* buf = taosMemoryMalloc(tlen);
726 727 728 729
  if (buf == NULL) {
    goto _return;
  }

L
Liu Jicong 已提交
730
  tSerializeSCMCreateTopicReq(buf, tlen, &req);
731 732
  /*printf("formatted: %s\n", dagStr);*/

L
Liu Jicong 已提交
733 734 735 736 737
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
738 739 740
  pRequest->type = TDMT_MND_CREATE_TOPIC;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
741
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
742 743 744 745 746

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
747

748
_return:
wafwerar's avatar
wafwerar 已提交
749
  taosMemoryFreeClear(astStr);
750 751
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
752
  /*destroySendMsgInfo(sendInfo);*/
753 754 755 756 757 758 759 760
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
L
Liu Jicong 已提交
761
#endif
762

L
Liu Jicong 已提交
763
#if 0
L
Liu Jicong 已提交
764 765
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
766
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
767 768
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
769
#endif
L
Liu Jicong 已提交
770 771

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
772 773
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
774
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
775
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
776
  if (code != 0) {
L
Liu Jicong 已提交
777
    tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
L
fix txn  
Liu Jicong 已提交
778
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
779 780
  }

X
Xiaoyu Wang 已提交
781 782 783
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
784
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
785 786
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
            tmqEpoch);
L
Liu Jicong 已提交
787
    /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
788 789 790 791
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
792
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
793 794
  }

L
Liu Jicong 已提交
795
#if 0
L
Liu Jicong 已提交
796
  if (pParam->sync == 1) {
wafwerar's avatar
wafwerar 已提交
797
    /**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/
L
Liu Jicong 已提交
798 799 800 801 802 803 804 805 806 807 808 809 810 811
    *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
    if (*pParam->msg) {
      memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead));
      tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp));
      if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
        pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
      }
      taosWriteQitem(tmq->mqueue, *pParam->msg);
      tsem_post(&pParam->rspSem);
      return 0;
    }
    tsem_post(&pParam->rspSem);
    return -1;
  }
L
Liu Jicong 已提交
812
#endif
L
Liu Jicong 已提交
813

L
Liu Jicong 已提交
814 815
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
816
    tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
L
fix txn  
Liu Jicong 已提交
817
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
818
  }
L
Liu Jicong 已提交
819

L
Liu Jicong 已提交
820 821 822
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
823

L
Liu Jicong 已提交
824 825
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
826
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
827

L
Liu Jicong 已提交
828 829
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
830

L
Liu Jicong 已提交
831
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
temp  
Liu Jicong 已提交
832
  /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
833

L
Liu Jicong 已提交
834
  return 0;
L
fix txn  
Liu Jicong 已提交
835
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
836 837 838
  if (pParam->epoch == tmq->epoch) {
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
L
temp  
Liu Jicong 已提交
839
  /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
840
  return -1;
841 842
}

X
Xiaoyu Wang 已提交
843
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
L
Liu Jicong 已提交
844
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
845
  bool    set = false;
L
Liu Jicong 已提交
846 847
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
848 849
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
850 851 852 853 854 855 856 857 858 859 860 861
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
862 863
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
864
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
865
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
866
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
867

L
Liu Jicong 已提交
868
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
869 870 871 872 873 874
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
875
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
876 877 878 879
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
880
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
881 882 883 884 885 886 887 888 889
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
890
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
891 892 893
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
894
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
895 896
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
897
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
898
      }
L
Liu Jicong 已提交
899
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
900 901
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
902
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
903 904 905
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
906
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
907 908 909 910
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
911
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
912
  }
L
Liu Jicong 已提交
913
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
914
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
915
  tmq->clientTopics = newTopics;
X
Xiaoyu Wang 已提交
916 917 918 919
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
920
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
921
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
922
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
923
  pParam->code = code;
924
  if (code != 0) {
L
temp  
Liu Jicong 已提交
925
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
L
Liu Jicong 已提交
926
    goto END;
927
  }
L
Liu Jicong 已提交
928

L
Liu Jicong 已提交
929
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
930
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
931
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
932 933
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
934
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
935 936
  if (head->epoch <= epoch) {
    goto END;
937
  }
L
Liu Jicong 已提交
938

X
Xiaoyu Wang 已提交
939 940 941 942 943
  if (pParam->sync) {
    SMqCMGetSubEpRsp rsp;
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
944
    if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
X
Xiaoyu Wang 已提交
945
      atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
946
    }
X
Xiaoyu Wang 已提交
947 948
    tDeleteSMqCMGetSubEpRsp(&rsp);
  } else {
L
Liu Jicong 已提交
949 950
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
951
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
952 953
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
954
    }
L
Liu Jicong 已提交
955 956 957 958
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
959

L
Liu Jicong 已提交
960
    taosWriteQitem(tmq->mqueue, pWrapper);
L
temp  
Liu Jicong 已提交
961
    /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
962
    taosMemoryFree(pParam);
963
  }
L
Liu Jicong 已提交
964 965

END:
L
fix txn  
Liu Jicong 已提交
966
  atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
967 968 969 970
  if (pParam->sync) {
    tsem_post(&pParam->rspSem);
  }
  return code;
971 972
}

X
Xiaoyu Wang 已提交
973
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
L
Liu Jicong 已提交
974 975
  int32_t code = 0;
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
976
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
977
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
978
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
979
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
980
  }
L
temp  
Liu Jicong 已提交
981
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
982
  int32_t           tlen = sizeof(SMqCMGetSubEpReq);
wafwerar's avatar
wafwerar 已提交
983
  SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
984
  if (req == NULL) {
L
Liu Jicong 已提交
985
    tscError("failed to malloc get subscribe ep buf");
L
add log  
Liu Jicong 已提交
986
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
987
    return -1;
L
Liu Jicong 已提交
988
  }
L
Liu Jicong 已提交
989 990 991
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
992

wafwerar's avatar
wafwerar 已提交
993
  SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
994 995
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
996
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
997
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
998
    return -1;
L
Liu Jicong 已提交
999 1000
  }
  pParam->tmq = tmq;
X
Xiaoyu Wang 已提交
1001 1002
  pParam->sync = sync;
  tsem_init(&pParam->rspSem, 0, 0);
1003

wafwerar's avatar
wafwerar 已提交
1004
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1005 1006
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1007 1008
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1009
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018 1019
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1020 1021 1022
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1023
  sendInfo->msgType = TDMT_MND_GET_SUB_EP;
1024

L
Liu Jicong 已提交
1025 1026
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1027 1028
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1029 1030
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1031

L
Liu Jicong 已提交
1032 1033 1034 1035 1036 1037
  if (sync) {
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1038 1039
}

L
Liu Jicong 已提交
1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1054
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1055 1056 1057 1058 1059 1060 1061 1062
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

L
Liu Jicong 已提交
1063
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1075
  SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2));
1076 1077 1078
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1079

L
Liu Jicong 已提交
1080 1081 1082 1083 1084 1085 1086
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1087

L
fix  
Liu Jicong 已提交
1088
  pReq->blockingTime = blockingTime;
L
Liu Jicong 已提交
1089
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1090
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1091
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1092
  pReq->reqId = generateRequestId();
1093 1094

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1095
  pReq->head.contLen = htonl(sizeof(SMqPollReqV2));
1096 1097 1098
  return pReq;
}

L
Liu Jicong 已提交
1099 1100 1101
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1102
  strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
1103
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1104 1105
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1106

L
Liu Jicong 已提交
1107 1108 1109
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
  setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
L
Liu Jicong 已提交
1110

L
Liu Jicong 已提交
1111 1112
  taosFreeQitem(pWrapper);
  return pRspObj;
X
Xiaoyu Wang 已提交
1113 1114 1115
}

int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
L
fix  
Liu Jicong 已提交
1116
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1117 1118 1119 1120 1121 1122
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1123
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1124
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1125
        continue;
L
Liu Jicong 已提交
1126
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1127 1128 1129 1130 1131 1132 1133
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1134
      }
L
Liu Jicong 已提交
1135
      atomic_store_32(&pVg->vgSkipCnt, 0);
L
Liu Jicong 已提交
1136
      SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
X
Xiaoyu Wang 已提交
1137 1138
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1139
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1140 1141
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1142
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1143
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1144
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1145
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1146
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1147 1148
        return -1;
      }
L
Liu Jicong 已提交
1149 1150
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1151
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1152
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1153 1154 1155
      pParam->epoch = tmq->epoch;
      pParam->sync = 0;

wafwerar's avatar
wafwerar 已提交
1156
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1157
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1158 1159
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1160
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1161
        /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
1162 1163 1164 1165
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1166
          .pData = pReq,
L
Liu Jicong 已提交
1167
          .len = sizeof(SMqPollReqV2),
X
Xiaoyu Wang 已提交
1168 1169
          .handle = NULL,
      };
L
Liu Jicong 已提交
1170
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1171
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1172
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1173
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1174
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1175 1176

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1177
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1178
      /*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
L
Liu Jicong 已提交
1179 1180
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1181
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1182 1183 1184 1185 1186 1187 1188 1189
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1190 1191
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1192
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1193 1194 1195 1196
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
      SMqCMGetSubEpRsp*   rspMsg = &pEpRspWrapper->msg;
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1197
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1208
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1209
  while (1) {
L
Liu Jicong 已提交
1210 1211 1212
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1213
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1214 1215
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1216 1217
    }

L
Liu Jicong 已提交
1218 1219
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1220
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
L
fix  
Liu Jicong 已提交
1221
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1222
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1223
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1224
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1225
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1226
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1227
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1228
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1229 1230
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1231 1232
          continue;
        }
L
Liu Jicong 已提交
1233
        // build rsp
L
Liu Jicong 已提交
1234 1235
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        return pRsp;
X
Xiaoyu Wang 已提交
1236
      } else {
L
Liu Jicong 已提交
1237
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1238
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1239 1240
      }
    } else {
L
fix  
Liu Jicong 已提交
1241
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1242
      bool reset = false;
L
Liu Jicong 已提交
1243 1244
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1245
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1246
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
X
Xiaoyu Wang 已提交
1247 1248 1249 1250 1251 1252
        tmqPollImpl(tmq, blockingTime);
      }
    }
  }
}

L
Liu Jicong 已提交
1253 1254 1255
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1256

L
Liu Jicong 已提交
1257
  // TODO: put into another thread or delayed queue
1258
  int64_t status = atomic_load_64(&tmq->status);
L
Liu Jicong 已提交
1259 1260 1261 1262
  while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) {
    tscDebug("not ready, retry\n");
    taosSsleep(1);
  }
L
Liu Jicong 已提交
1263

L
Liu Jicong 已提交
1264 1265 1266
  rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1267
  }
X
Xiaoyu Wang 已提交
1268 1269 1270

  while (1) {
    /*printf("cycle\n");*/
L
Liu Jicong 已提交
1271
    tmqAskEp(tmq, false);
L
Liu Jicong 已提交
1272
    tmqPollImpl(tmq, blocking_time);
L
Liu Jicong 已提交
1273

L
temp  
Liu Jicong 已提交
1274
    /*tsem_wait(&tmq->rspSem);*/
L
Liu Jicong 已提交
1275

L
Liu Jicong 已提交
1276 1277 1278
    rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1279 1280 1281 1282
    }
    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
L
Liu Jicong 已提交
1283
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1284 1285 1286 1287 1288 1289
        return NULL;
      }
    }
  }
}

L
Liu Jicong 已提交
1290 1291 1292
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
  // TODO
  return TMQ_RESP_ERR__SUCCESS;
1293
}
L
Liu Jicong 已提交
1294 1295 1296 1297 1298 1299 1300

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1301

L
Liu Jicong 已提交
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318
char* tmq_get_topic_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->topic;
  } else {
    return NULL;
  }
}

int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}