tmq.c 45.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25 26
#include "tref.h"

L
Liu Jicong 已提交
27 28 29 30
struct tmq_message_t {
  SMqPollRsp msg;
  char*      topic;
  SArray*    res;  // SArray<SReqResultInfo>
L
Liu Jicong 已提交
31
  int32_t    vgId;
L
Liu Jicong 已提交
32 33 34
  int32_t    resIter;
};

L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43 44 45
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
  int8_t           tmqRspType;
  int32_t          epoch;
  SMqCMGetSubEpRsp msg;
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
46
struct tmq_list_t {
L
Liu Jicong 已提交
47
  SArray container;
L
Liu Jicong 已提交
48
};
L
Liu Jicong 已提交
49

L
Liu Jicong 已提交
50
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
51
  SMqOffset offset;
L
Liu Jicong 已提交
52 53 54
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
55 56
  int32_t             cnt;
  int32_t             size;
L
Liu Jicong 已提交
57 58 59 60
  tmq_topic_vgroup_t* elems;
};

struct tmq_conf_t {
L
Liu Jicong 已提交
61
  char           clientId[256];
L
Liu Jicong 已提交
62
  char           groupId[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
63
  int8_t         autoCommit;
L
Liu Jicong 已提交
64
  int8_t         resetOffset;
L
Liu Jicong 已提交
65 66 67 68 69
  uint16_t       port;
  char*          ip;
  char*          user;
  char*          pass;
  char*          db;
L
Liu Jicong 已提交
70
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
L
Liu Jicong 已提交
74
  // conf
L
Liu Jicong 已提交
75 76 77 78
  char   groupId[TSDB_CGROUP_LEN];
  char   clientId[256];
  int8_t autoCommit;
  /*int8_t         inWaiting;*/
L
Liu Jicong 已提交
79
  int64_t        consumerId;
L
Liu Jicong 已提交
80
  int32_t        epoch;
L
Liu Jicong 已提交
81
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
82 83 84
  int64_t        status;
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
85 86 87 88 89 90 91 92 93
  /*int32_t        nextTopicIdx;*/
  int8_t  epStatus;
  int32_t epSkipCnt;
  /*int32_t        waitingRequest;*/
  /*int32_t        readyRequest;*/
  SArray*     clientTopics;  // SArray<SMqClientTopic>
  STaosQueue* mqueue;        // queue of tmq_message_t
  STaosQall*  qall;
  tsem_t      rspSem;
L
Liu Jicong 已提交
94 95
  // stat
  int64_t pollCnt;
L
Liu Jicong 已提交
96 97
};

X
Xiaoyu Wang 已提交
98 99 100 101 102 103 104 105
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
L
Liu Jicong 已提交
106 107
};

L
Liu Jicong 已提交
108
typedef struct {
109 110 111 112
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
113
  // connection info
114
  int32_t vgId;
X
Xiaoyu Wang 已提交
115
  int32_t vgStatus;
L
Liu Jicong 已提交
116
  int32_t vgSkipCnt;
117 118 119
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
120
typedef struct {
121
  // subscribe info
L
Liu Jicong 已提交
122 123 124 125 126 127 128 129
  int32_t        sqlLen;
  char*          sql;
  char*          topicName;
  int64_t        topicId;
  SArray*        vgs;  // SArray<SMqClientVg>
  int8_t         isSchemaAdaptive;
  int32_t        numOfFields;
  SSchemaWrapper schema;
130 131
} SMqClientTopic;

L
Liu Jicong 已提交
132 133 134 135 136
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
137
  SMqDataBlkRsp   msg;
L
Liu Jicong 已提交
138 139
} SMqPollRspWrapper;

L
Liu Jicong 已提交
140
typedef struct {
L
Liu Jicong 已提交
141 142 143 144
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
typedef struct {
147
  tmq_t*  tmq;
L
Liu Jicong 已提交
148
  int32_t code;
X
Xiaoyu Wang 已提交
149 150
  int32_t sync;
  tsem_t  rspSem;
151 152
} SMqAskEpCbParam;

L
Liu Jicong 已提交
153
typedef struct {
L
Liu Jicong 已提交
154 155
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
156
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
157
  int32_t         epoch;
L
Liu Jicong 已提交
158
  int32_t         vgId;
L
Liu Jicong 已提交
159 160
  tsem_t          rspSem;
  int32_t         sync;
X
Xiaoyu Wang 已提交
161
} SMqPollCbParam;
162

L
Liu Jicong 已提交
163
typedef struct {
L
Liu Jicong 已提交
164
  tmq_t*         tmq;
L
Liu Jicong 已提交
165
  int32_t        async;
L
Liu Jicong 已提交
166 167
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
168
  /*SMqClientVg* pVg;*/
L
Liu Jicong 已提交
169
} SMqCommitCbParam;
L
Liu Jicong 已提交
170

171
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
172
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
L
Liu Jicong 已提交
173
  conf->autoCommit = false;
X
Xiaoyu Wang 已提交
174
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
175 176 177
  return conf;
}

L
Liu Jicong 已提交
178
void tmq_conf_destroy(tmq_conf_t* conf) {
wafwerar's avatar
wafwerar 已提交
179
  if (conf) taosMemoryFree(conf);
L
Liu Jicong 已提交
180 181 182
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
183 184
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
185
    return TMQ_CONF_OK;
186
  }
L
Liu Jicong 已提交
187

188 189
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
190 191
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
192

L
Liu Jicong 已提交
193 194
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
195
      conf->autoCommit = true;
L
Liu Jicong 已提交
196 197
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
198
      conf->autoCommit = false;
L
Liu Jicong 已提交
199 200 201 202
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
203
  }
L
Liu Jicong 已提交
204

L
Liu Jicong 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
219

L
Liu Jicong 已提交
220
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
221 222 223
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
224
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
225 226 227
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
228
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
229 230 231
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
232
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
233 234 235
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
236
  if (strcmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
237 238 239 240
    conf->db = strdup(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
241
  return TMQ_CONF_UNKNOWN;
242 243 244
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
245 246
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
247 248
}

L
Liu Jicong 已提交
249 250 251
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
252
  if (taosArrayPush(container, &topic) == NULL) return -1;
253 254 255
  return 0;
}

L
Liu Jicong 已提交
256
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
257
  SArray* container = &list->container;
L
fix  
Liu Jicong 已提交
258
  /*taosArrayDestroy(container);*/
259 260 261 262 263 264
  int32_t sz = taosArrayGetSize(container);
  for (int32_t i = 0; i < sz; i++) {
    char* str = taosArrayGetP(container, i);
    taosMemoryFree(str);
  }
  taosArrayDestroy(container);
L
Liu Jicong 已提交
265 266
}

L
Liu Jicong 已提交
267 268 269 270
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
271
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
272
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
273 274 275 276 277 278 279 280
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
281
  msg = NULL;
L
Liu Jicong 已提交
282 283 284 285 286 287 288 289 290 291
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
292 293 294 295 296 297
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
298

L
Liu Jicong 已提交
299
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
300
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
301
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
L
Liu Jicong 已提交
302
  if (pParam->tmq->commit_cb) {
L
Liu Jicong 已提交
303
    pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
L
Liu Jicong 已提交
304
  }
L
fix  
Liu Jicong 已提交
305
  if (!pParam->async) tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
306 307 308
  return 0;
}

X
Xiaoyu Wang 已提交
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
    tmq_list_append(*topics, strdup(topic->topicName));
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  return tmq_subscribe(tmq, lst);
}

325
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
326
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
327 328 329 330
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
L
Liu Jicong 已提交
331
  /*pTmq->inWaiting = 0;*/
332 333
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
334
  pTmq->epoch = 0;
L
Liu Jicong 已提交
335 336
  /*pTmq->waitingRequest = 0;*/
  /*pTmq->readyRequest = 0;*/
L
fix txn  
Liu Jicong 已提交
337
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
338
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
339
  // set conf
340 341
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
342
  pTmq->autoCommit = conf->autoCommit;
343
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
344
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
345

L
Liu Jicong 已提交
346 347 348 349 350 351 352 353 354 355
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
356 357
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
358 359 360 361 362 363 364 365
  return pTmq;
}

tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
366 367 368 369 370 371 372 373 374
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
  ASSERT(conf->db);

  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) return NULL;
L
Liu Jicong 已提交
375

L
Liu Jicong 已提交
376
  /*pTmq->inWaiting = 0;*/
L
Liu Jicong 已提交
377 378 379
  pTmq->status = 0;
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
380 381
  /*pTmq->waitingRequest = 0;*/
  /*pTmq->readyRequest = 0;*/
L
temp  
Liu Jicong 已提交
382 383
  pTmq->epStatus = 0;
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
384 385 386 387 388 389 390
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
  pTmq->autoCommit = conf->autoCommit;
  pTmq->commit_cb = conf->commit_cb;
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
391
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
392
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
L
Liu Jicong 已提交
393 394 395 396
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }
X
Xiaoyu Wang 已提交
397 398 399

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
400 401 402

  tsem_init(&pTmq->rspSem, 0, 0);

403 404 405
  return pTmq;
}

L
Liu Jicong 已提交
406 407 408 409
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
410 411
  // build msg
  // send to mnode
L
Liu Jicong 已提交
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  SMqCMCommitOffsetReq req;
  SArray*              pArray = NULL;

  if (offsets == NULL) {
    pArray = taosArrayInit(0, sizeof(SMqOffset));
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pArray, &offset);
      }
    }
    req.num = pArray->size;
    req.offsets = pArray->pData;
  } else {
    req.num = offsets->cnt;
    req.offsets = (SMqOffset*)offsets->elems;
  }
L
Liu Jicong 已提交
435 436 437 438

  SCoder encoder;

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
L
Liu Jicong 已提交
439
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
440
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
441
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
442 443 444 445 446 447 448
  if (buf == NULL) {
    tCoderClear(&encoder);
    return -1;
  }
  tCoderClear(&encoder);

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
L
Liu Jicong 已提交
449
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
450 451
  tCoderClear(&encoder);

L
Liu Jicong 已提交
452
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
453 454 455 456
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

wafwerar's avatar
wafwerar 已提交
457
  SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
458 459 460 461 462
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
463
  pParam->async = async;
L
Liu Jicong 已提交
464

X
Xiaoyu Wang 已提交
465 466 467 468 469
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
470 471

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
472 473
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
474 475 476 477 478
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
479 480 481 482
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
  }
L
Liu Jicong 已提交
483

L
fix  
Liu Jicong 已提交
484
  tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
485
  taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
486

L
Liu Jicong 已提交
487 488 489 490 491
  if (pArray) {
    taosArrayDestroy(pArray);
  }

  return resp;
L
Liu Jicong 已提交
492 493
}

L
Liu Jicong 已提交
494
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
L
Liu Jicong 已提交
495
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
496 497
  SArray*      container = &topic_list->container;
  int32_t      sz = taosArrayGetSize(container);
L
Liu Jicong 已提交
498
  // destroy ex
499 500 501 502 503 504
  taosArrayDestroy(tmq->clientTopics);
  tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));

  SCMSubscribeReq req;
  req.topicNum = sz;
  req.consumerId = tmq->consumerId;
505
  strcpy(req.cgroup, tmq->groupId);
506 507 508
  req.topicNames = taosArrayInit(sz, sizeof(void*));

  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
509 510
    /*char* topicName = topic_list->elems[i];*/
    char* topicName = taosArrayGetP(container, i);
511 512 513

    SName name = {0};
    char* dbName = getDbOfConnection(tmq->pTscObj);
L
Liu Jicong 已提交
514 515 516
    if (dbName == NULL) {
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
517
    tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
518 519
    tNameFromString(&name, topicName, T_NAME_TABLE);

wafwerar's avatar
wafwerar 已提交
520
    char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
521
    if (topicFname == NULL) {
L
Liu Jicong 已提交
522
      goto _return;
523 524 525 526
    }
    tNameExtractFullName(&name, topicFname);
    tscDebug("subscribe topic: %s", topicFname);
    SMqClientTopic topic = {
L
Liu Jicong 已提交
527 528 529 530 531 532
        .sql = NULL,
        .sqlLen = 0,
        .topicId = 0,
        .topicName = topicFname,
        .vgs = NULL,
    };
533
    topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
L
Liu Jicong 已提交
534
    taosArrayPush(tmq->clientTopics, &topic);
535
    taosArrayPush(req.topicNames, &topicFname);
wafwerar's avatar
wafwerar 已提交
536
    taosMemoryFree(dbName);
537 538
  }

L
Liu Jicong 已提交
539
  int   tlen = tSerializeSCMSubscribeReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
540
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
541
  if (buf == NULL) {
542 543 544 545 546 547 548 549 550
    goto _return;
  }

  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
  if (pRequest == NULL) {
L
Liu Jicong 已提交
551
    tscError("failed to malloc request");
552 553
  }

X
Xiaoyu Wang 已提交
554 555 556 557
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
558 559
  tsem_init(&param.rspSem, 0, 0);

X
Xiaoyu Wang 已提交
560 561 562 563 564
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
565 566

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
567 568
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
569 570 571 572 573
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
574 575
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
576 577 578

_return:
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
579
  /*destroySendMsgInfo(sendInfo);*/
580 581
  /*}*/

L
Liu Jicong 已提交
582
  return param.rspErr;
583 584
}

L
Liu Jicong 已提交
585
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
586

L
Liu Jicong 已提交
587 588 589 590 591 592 593 594 595 596 597 598 599 600 601
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
602 603
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
604 605 606 607 608 609 610 611 612 613 614 615
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
616
  int32_t code = 0;
L
Liu Jicong 已提交
617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);

  // todo check for invalid sql statement and return with error code

  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
636
  strcpy(req.outputSTbName, tbName);
L
Liu Jicong 已提交
637 638

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
639
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
663
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
664 665 666 667 668 669 670 671 672 673 674 675
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}

L
Liu Jicong 已提交
676
#if 0
L
Liu Jicong 已提交
677 678 679
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
680
  SQuery*      pQueryNode = NULL;
L
Liu Jicong 已提交
681
  char*        astStr = NULL;
682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

L
Liu Jicong 已提交
702
  tscDebug("start to create topic: %s", topicName);
703

X
Xiaoyu Wang 已提交
704
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
705 706
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
707 708 709

  // todo check for invalid sql statement and return with error code

L
Liu Jicong 已提交
710
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
711

L
Liu Jicong 已提交
712
  /*printf("%s\n", pStr);*/
713

L
Liu Jicong 已提交
714
  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
X
Xiaoyu Wang 已提交
715 716
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, topicName);
717

L
Liu Jicong 已提交
718
  SCMCreateTopicReq req = {
L
Liu Jicong 已提交
719
      .igExists = 1,
L
Liu Jicong 已提交
720
      .ast = astStr,
L
Liu Jicong 已提交
721
      .sql = (char*)sql,
722
  };
L
Liu Jicong 已提交
723
  tNameExtractFullName(&name, req.name);
724

L
Liu Jicong 已提交
725
  int   tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
726
  void* buf = taosMemoryMalloc(tlen);
727 728 729 730
  if (buf == NULL) {
    goto _return;
  }

L
Liu Jicong 已提交
731
  tSerializeSCMCreateTopicReq(buf, tlen, &req);
732 733
  /*printf("formatted: %s\n", dagStr);*/

L
Liu Jicong 已提交
734 735 736 737 738
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
739 740 741
  pRequest->type = TDMT_MND_CREATE_TOPIC;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
742
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
743 744 745 746 747

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
748

749
_return:
wafwerar's avatar
wafwerar 已提交
750
  taosMemoryFreeClear(astStr);
751 752
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
753
  /*destroySendMsgInfo(sendInfo);*/
754 755 756 757 758 759 760 761
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
L
Liu Jicong 已提交
762
#endif
763

L
Liu Jicong 已提交
764
static char* formatTimestamp(char* buf, int64_t val, int precision) {
765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799
  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /* comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
    */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

800
  struct tm* ptm = taosLocalTime(&tt, NULL);
801 802 803 804 805 806 807 808 809 810 811 812
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}
L
Liu Jicong 已提交
813
#if 0
L
Liu Jicong 已提交
814 815
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
816
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
817 818 819
  return pRsp->skipLogNum;
}

L
Liu Jicong 已提交
820 821 822
void tmqShowMsg(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;

L
Liu Jicong 已提交
823 824
  static bool noPrintSchema;
  char        pBuf[128];
L
Liu Jicong 已提交
825
  SMqPollRsp* pRsp = &tmq_message->msg;
L
fix  
Liu Jicong 已提交
826
  int32_t     colNum = 2;
L
Liu Jicong 已提交
827 828 829 830
  if (!noPrintSchema) {
    printf("|");
    for (int32_t i = 0; i < colNum; i++) {
      if (i == 0)
L
Liu Jicong 已提交
831
        printf(" %25s |", pRsp->schema->pSchema[i].name);
L
Liu Jicong 已提交
832
      else
L
Liu Jicong 已提交
833
        printf(" %15s |", pRsp->schema->pSchema[i].name);
L
Liu Jicong 已提交
834 835 836 837
    }
    printf("\n");
    printf("===============================================\n");
    noPrintSchema = true;
838
  }
L
Liu Jicong 已提交
839
  int32_t sz = taosArrayGetSize(pRsp->pBlockData);
840
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
841 842
    SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i);
    int32_t      rows = pDataBlock->info.rows;
843 844 845 846
    for (int32_t j = 0; j < rows; j++) {
      printf("|");
      for (int32_t k = 0; k < colNum; k++) {
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
L
Liu Jicong 已提交
847 848
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
        switch (pColInfoData->info.type) {
849 850 851 852 853 854 855 856 857 858 859 860 861
          case TSDB_DATA_TYPE_TIMESTAMP:
            formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
            printf(" %25s |", pBuf);
            break;
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t*)var);
            break;
        }
      }
      printf("\n");
    }
  }
L
Liu Jicong 已提交
862
}
L
Liu Jicong 已提交
863
#endif
L
Liu Jicong 已提交
864 865

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
866 867
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
868
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
869
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
870
  if (code != 0) {
L
Liu Jicong 已提交
871
    tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
L
fix txn  
Liu Jicong 已提交
872
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
873 874
  }

X
Xiaoyu Wang 已提交
875 876 877
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
878
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
879 880
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
            tmqEpoch);
L
Liu Jicong 已提交
881
    /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
882 883 884 885
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
886
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
887 888
  }

L
Liu Jicong 已提交
889
#if 0
L
Liu Jicong 已提交
890
  if (pParam->sync == 1) {
wafwerar's avatar
wafwerar 已提交
891
    /**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/
L
Liu Jicong 已提交
892 893 894 895 896 897 898 899 900 901 902 903 904 905
    *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
    if (*pParam->msg) {
      memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead));
      tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp));
      if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
        pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
      }
      taosWriteQitem(tmq->mqueue, *pParam->msg);
      tsem_post(&pParam->rspSem);
      return 0;
    }
    tsem_post(&pParam->rspSem);
    return -1;
  }
L
Liu Jicong 已提交
906
#endif
L
Liu Jicong 已提交
907

L
Liu Jicong 已提交
908 909
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
910
    tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
L
fix txn  
Liu Jicong 已提交
911
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
912
  }
L
Liu Jicong 已提交
913

L
Liu Jicong 已提交
914 915 916
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
917

L
Liu Jicong 已提交
918 919
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));

L
Liu Jicong 已提交
920
  tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
921

L
Liu Jicong 已提交
922 923
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
924

L
Liu Jicong 已提交
925
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
temp  
Liu Jicong 已提交
926
  /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
927

L
Liu Jicong 已提交
928
  return 0;
L
fix txn  
Liu Jicong 已提交
929
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
930 931 932
  if (pParam->epoch == tmq->epoch) {
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
L
temp  
Liu Jicong 已提交
933
  /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
934
  return -1;
935 936
}

X
Xiaoyu Wang 已提交
937
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
L
Liu Jicong 已提交
938
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
939
  bool    set = false;
L
Liu Jicong 已提交
940 941
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
942 943
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
944 945 946 947 948 949 950 951 952 953 954 955
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
956 957
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
958
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
959
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
960
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
961

L
Liu Jicong 已提交
962
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
963 964 965 966 967 968
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
969
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
970 971 972 973
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
974
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
975 976 977 978 979 980 981 982 983
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
984
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
985 986 987
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
988
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
989 990
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
991
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
992
      }
L
Liu Jicong 已提交
993
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
994 995
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
996
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
997 998 999
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1000
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1001 1002 1003 1004
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1005
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1006
  }
L
Liu Jicong 已提交
1007
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1008
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1009
  tmq->clientTopics = newTopics;
X
Xiaoyu Wang 已提交
1010 1011 1012 1013
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1014
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1015
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1016
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1017
  pParam->code = code;
1018
  if (code != 0) {
L
temp  
Liu Jicong 已提交
1019
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
L
Liu Jicong 已提交
1020
    goto END;
1021
  }
L
Liu Jicong 已提交
1022

L
Liu Jicong 已提交
1023
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1024
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1025
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1026 1027
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1028
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1029 1030
  if (head->epoch <= epoch) {
    goto END;
1031
  }
L
Liu Jicong 已提交
1032

X
Xiaoyu Wang 已提交
1033 1034 1035 1036 1037
  if (pParam->sync) {
    SMqCMGetSubEpRsp rsp;
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1038
    if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
X
Xiaoyu Wang 已提交
1039
      atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1040
    }
X
Xiaoyu Wang 已提交
1041 1042
    tDeleteSMqCMGetSubEpRsp(&rsp);
  } else {
L
Liu Jicong 已提交
1043 1044 1045
    /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1046
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1047 1048
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1049
    }
L
Liu Jicong 已提交
1050 1051 1052 1053
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1054

L
Liu Jicong 已提交
1055
    taosWriteQitem(tmq->mqueue, pWrapper);
L
temp  
Liu Jicong 已提交
1056
    /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
1057
    taosMemoryFree(pParam);
1058
  }
L
Liu Jicong 已提交
1059 1060

END:
L
fix txn  
Liu Jicong 已提交
1061
  atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1062 1063 1064 1065
  if (pParam->sync) {
    tsem_post(&pParam->rspSem);
  }
  return code;
1066 1067
}

X
Xiaoyu Wang 已提交
1068
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
L
Liu Jicong 已提交
1069 1070
  int32_t code = 0;
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1071
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1072
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1073
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1074
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1075
  }
L
temp  
Liu Jicong 已提交
1076
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1077
  int32_t           tlen = sizeof(SMqCMGetSubEpReq);
wafwerar's avatar
wafwerar 已提交
1078
  SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
1079
  if (req == NULL) {
L
Liu Jicong 已提交
1080
    tscError("failed to malloc get subscribe ep buf");
L
add log  
Liu Jicong 已提交
1081
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1082
    return -1;
L
Liu Jicong 已提交
1083
  }
L
Liu Jicong 已提交
1084 1085 1086
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1087

wafwerar's avatar
wafwerar 已提交
1088
  SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1089 1090
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1091
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1092
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1093
    return -1;
L
Liu Jicong 已提交
1094 1095
  }
  pParam->tmq = tmq;
X
Xiaoyu Wang 已提交
1096 1097
  pParam->sync = sync;
  tsem_init(&pParam->rspSem, 0, 0);
1098

wafwerar's avatar
wafwerar 已提交
1099
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1100 1101
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1102 1103
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1104
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1105 1106 1107 1108 1109 1110 1111 1112 1113 1114
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1115 1116 1117
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1118
  sendInfo->msgType = TDMT_MND_GET_SUB_EP;
1119

L
Liu Jicong 已提交
1120 1121
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1122 1123
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1124 1125
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1126

L
Liu Jicong 已提交
1127 1128 1129 1130 1131 1132
  if (sync) {
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1133 1134
}

L
Liu Jicong 已提交
1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1149
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1150 1151 1152 1153 1154 1155 1156 1157
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

L
Liu Jicong 已提交
1158
SMqPollReqV2* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1170
  SMqPollReqV2* pReq = taosMemoryMalloc(sizeof(SMqPollReqV2));
1171 1172 1173
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1174

L
Liu Jicong 已提交
1175 1176 1177 1178 1179 1180 1181
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

  int32_t tlen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, tlen);
  pReq->subKey[tlen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + tlen + 1, pTopic->topicName);
1182

L
fix  
Liu Jicong 已提交
1183
  pReq->blockingTime = blockingTime;
L
Liu Jicong 已提交
1184
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1185
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1186
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1187
  pReq->reqId = generateRequestId();
1188 1189

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1190
  pReq->head.contLen = htonl(sizeof(SMqPollReqV2));
1191 1192 1193
  return pReq;
}

L
Liu Jicong 已提交
1194 1195 1196
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1197
  strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
L
Liu Jicong 已提交
1198
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1199 1200
  pRspObj->resIter = -1;
  memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1201

L
Liu Jicong 已提交
1202 1203 1204 1205
  /*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
  setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
L
Liu Jicong 已提交
1206

L
Liu Jicong 已提交
1207 1208
  taosFreeQitem(pWrapper);
  return pRspObj;
X
Xiaoyu Wang 已提交
1209 1210 1211
}

int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
L
fix  
Liu Jicong 已提交
1212
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1213 1214 1215 1216 1217 1218
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1219
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1220
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1221
        continue;
L
Liu Jicong 已提交
1222
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1223 1224 1225 1226 1227 1228 1229
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1230
      }
L
Liu Jicong 已提交
1231
      atomic_store_32(&pVg->vgSkipCnt, 0);
L
Liu Jicong 已提交
1232
      SMqPollReqV2* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
X
Xiaoyu Wang 已提交
1233 1234
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1235
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1236 1237
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1238
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1239
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1240
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1241
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1242
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1243 1244
        return -1;
      }
L
Liu Jicong 已提交
1245 1246
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1247
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1248
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1249 1250 1251
      pParam->epoch = tmq->epoch;
      pParam->sync = 0;

wafwerar's avatar
wafwerar 已提交
1252
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1253
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1254 1255
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1256
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1257
        /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
1258 1259 1260 1261
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1262
          .pData = pReq,
L
Liu Jicong 已提交
1263
          .len = sizeof(SMqPollReqV2),
X
Xiaoyu Wang 已提交
1264 1265
          .handle = NULL,
      };
L
Liu Jicong 已提交
1266
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1267
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1268
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1269
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1270
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1271 1272

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1273
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1274
      /*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
L
Liu Jicong 已提交
1275 1276
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1277
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1278 1279 1280 1281 1282 1283 1284 1285
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1286 1287
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1288
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1289 1290 1291 1292
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
      SMqCMGetSubEpRsp*   rspMsg = &pEpRspWrapper->msg;
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1293
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1304
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1305
  while (1) {
L
Liu Jicong 已提交
1306 1307 1308
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1309
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1310 1311
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1312 1313
    }

L
Liu Jicong 已提交
1314 1315
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1316
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
L
fix  
Liu Jicong 已提交
1317
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1318
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1319
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1320
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1321
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1322
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1323
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1324
        if (pollRspWrapper->msg.blockNum == 0) {
L
Liu Jicong 已提交
1325 1326
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1327 1328
          continue;
        }
L
Liu Jicong 已提交
1329
        // build rsp
L
Liu Jicong 已提交
1330 1331
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        return pRsp;
X
Xiaoyu Wang 已提交
1332
      } else {
L
Liu Jicong 已提交
1333
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1334
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1335 1336
      }
    } else {
L
fix  
Liu Jicong 已提交
1337
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1338
      bool reset = false;
L
Liu Jicong 已提交
1339 1340
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1341
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1342
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
X
Xiaoyu Wang 已提交
1343 1344 1345 1346 1347 1348
        tmqPollImpl(tmq, blockingTime);
      }
    }
  }
}

L
Liu Jicong 已提交
1349
#if 0
L
Liu Jicong 已提交
1350
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
X
Xiaoyu Wang 已提交
1351 1352
  tmq_message_t* rspMsg = NULL;
  int64_t        startTime = taosGetTimestampMs();
1353 1354

  int64_t status = atomic_load_64(&tmq->status);
X
Xiaoyu Wang 已提交
1355 1356
  tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);

L
Liu Jicong 已提交
1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369
  while (1) {
    rspMsg = tmqSyncPollImpl(tmq, blocking_time);
    if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
      return rspMsg;
    }

    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
        return NULL;
      }
    } else
      return NULL;
X
Xiaoyu Wang 已提交
1370
  }
L
Liu Jicong 已提交
1371
}
L
Liu Jicong 已提交
1372
#endif
X
Xiaoyu Wang 已提交
1373

L
Liu Jicong 已提交
1374 1375 1376
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1377

L
Liu Jicong 已提交
1378
  // TODO: put into another thread or delayed queue
1379
  int64_t status = atomic_load_64(&tmq->status);
L
Liu Jicong 已提交
1380 1381 1382 1383
  while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) {
    tscDebug("not ready, retry\n");
    taosSsleep(1);
  }
L
Liu Jicong 已提交
1384

L
Liu Jicong 已提交
1385 1386 1387
  rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1388
  }
X
Xiaoyu Wang 已提交
1389 1390 1391

  while (1) {
    /*printf("cycle\n");*/
L
Liu Jicong 已提交
1392
    tmqAskEp(tmq, false);
L
Liu Jicong 已提交
1393
    tmqPollImpl(tmq, blocking_time);
L
Liu Jicong 已提交
1394

L
temp  
Liu Jicong 已提交
1395
    /*tsem_wait(&tmq->rspSem);*/
L
Liu Jicong 已提交
1396

L
Liu Jicong 已提交
1397 1398 1399
    rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1400 1401 1402 1403
    }
    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
L
Liu Jicong 已提交
1404
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1405 1406 1407 1408 1409 1410 1411
        return NULL;
      }
    }
  }
}

#if 0
1412

L
Liu Jicong 已提交
1413
  if (blocking_time <= 0) blocking_time = 1;
L
Liu Jicong 已提交
1414 1415
  if (blocking_time > 1000) blocking_time = 1000;
  /*blocking_time = 1;*/
1416 1417 1418

  if (taosArrayGetSize(tmq->clientTopics) == 0) {
    tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
L
Liu Jicong 已提交
1419
    /*printf("over1\n");*/
wafwerar's avatar
wafwerar 已提交
1420
    taosMsleep(blocking_time);
1421 1422 1423 1424
    return NULL;
  }
  SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
  if (taosArrayGetSize(pTopic->vgs) == 0) {
L
Liu Jicong 已提交
1425
    /*printf("over2\n");*/
wafwerar's avatar
wafwerar 已提交
1426
    taosMsleep(blocking_time);
1427 1428 1429
    return NULL;
  }

L
Liu Jicong 已提交
1430
  tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
L
Liu Jicong 已提交
1431
  int32_t beginVgIdx = pTopic->nextVgIdx;
L
Liu Jicong 已提交
1432
  while (1) {
L
Liu Jicong 已提交
1433 1434 1435
    pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
    /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
L
Liu Jicong 已提交
1436
    SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
L
Liu Jicong 已提交
1437 1438
    if (pReq == NULL) {
      ASSERT(false);
wafwerar's avatar
wafwerar 已提交
1439
      taosMsleep(blocking_time);
L
Liu Jicong 已提交
1440 1441
      return NULL;
    }
1442

wafwerar's avatar
wafwerar 已提交
1443
    SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1444 1445
    if (param == NULL) {
      ASSERT(false);
wafwerar's avatar
wafwerar 已提交
1446
      taosMsleep(blocking_time);
L
Liu Jicong 已提交
1447 1448 1449 1450 1451 1452 1453 1454
      return NULL;
    }
    param->tmq = tmq;
    param->retMsg = &tmq_message;
    param->pVg = pVg;
    tsem_init(&param->rspSem, 0, 0);

    SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
X
Xiaoyu Wang 已提交
1455 1456 1457 1458 1459
    pRequest->body.requestMsg = (SDataBuf){
        .pData = pReq,
        .len = sizeof(SMqConsumeReq),
        .handle = NULL,
    };
1460

L
Liu Jicong 已提交
1461 1462 1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473
    SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
    sendInfo->requestObjRefId = 0;
    sendInfo->param = param;
    sendInfo->fp = tmqPollCb;

    /*printf("req offset: %ld\n", pReq->offset);*/

    int64_t transporterId = 0;
    asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
    tmq->pollCnt++;

    tsem_wait(&param->rspSem);
    tsem_destroy(&param->rspSem);
wafwerar's avatar
wafwerar 已提交
1474
    taosMemoryFree(param);
L
Liu Jicong 已提交
1475 1476 1477

    if (tmq_message == NULL) {
      if (beginVgIdx == pTopic->nextVgIdx) {
wafwerar's avatar
wafwerar 已提交
1478
        taosMsleep(blocking_time);
L
Liu Jicong 已提交
1479 1480 1481 1482
      } else {
        continue;
      }
    }
L
Liu Jicong 已提交
1483

L
Liu Jicong 已提交
1484
    return tmq_message;
L
Liu Jicong 已提交
1485
  }
1486 1487 1488 1489

  /*tsem_wait(&pRequest->body.rspSem);*/

  /*if (body != NULL) {*/
L
Liu Jicong 已提交
1490
  /*destroySendMsgInfo(body);*/
1491 1492 1493
  /*}*/

  /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
L
Liu Jicong 已提交
1494
  /*pRequest->code = terrno;*/
1495 1496 1497 1498
  /*}*/

  /*return pRequest;*/
}
X
Xiaoyu Wang 已提交
1499
#endif
1500

L
Liu Jicong 已提交
1501
#if 0
L
Liu Jicong 已提交
1502
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
L
Liu Jicong 已提交
1503
  if (tmq_topic_vgroup_list != NULL) {
L
Liu Jicong 已提交
1504
    // TODO
L
Liu Jicong 已提交
1505 1506
  }

L
Liu Jicong 已提交
1507
  // TODO: change semaphore to gate
L
Liu Jicong 已提交
1508 1509 1510
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
1511
      SMqClientVg*   pVg = taosArrayGet(pTopic->vgs, j);
L
Liu Jicong 已提交
1512
      SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
L
Liu Jicong 已提交
1513

L
Liu Jicong 已提交
1514 1515
      SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
      pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
wafwerar's avatar
wafwerar 已提交
1516
      SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
1517 1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
      if (pParam == NULL) {
        continue;
      }
      pParam->tmq = tmq;
      pParam->pVg = pVg;
      pParam->async = async;
      if (!async) tsem_init(&pParam->rspSem, 0, 0);

      SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqCommitCb;

      int64_t transporterId = 0;
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

      if (!async) tsem_wait(&pParam->rspSem);
    }
  }

L
Liu Jicong 已提交
1537
  return 0;
1538
}
L
Liu Jicong 已提交
1539
#endif
1540

L
Liu Jicong 已提交
1541
#if 0
1542 1543
void tmq_message_destroy(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;
L
Liu Jicong 已提交
1544
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1545
  tDeleteSMqConsumeRsp(pRsp);
wafwerar's avatar
wafwerar 已提交
1546
  /*taosMemoryFree(tmq_message);*/
X
Xiaoyu Wang 已提交
1547
  taosFreeQitem(tmq_message);
1548
}
L
Liu Jicong 已提交
1549
#endif
1550

L
Liu Jicong 已提交
1551
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
L
Liu Jicong 已提交
1552 1553 1554 1555 1556 1557 1558

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1559

L
Liu Jicong 已提交
1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583
char* tmq_get_topic_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->topic;
  } else {
    return NULL;
  }
}

int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}

void tmq_message_destroy(TAOS_RES* res) {
  if (res == NULL) return;
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
  }
}