tmq.c 47.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25 26
#include "tref.h"

L
Liu Jicong 已提交
27 28 29 30
struct tmq_message_t {
  SMqPollRsp msg;
  char*      topic;
  SArray*    res;  // SArray<SReqResultInfo>
L
Liu Jicong 已提交
31
  int32_t    vgId;
L
Liu Jicong 已提交
32 33 34
  int32_t    resIter;
};

L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43 44 45
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
  int8_t           tmqRspType;
  int32_t          epoch;
  SMqCMGetSubEpRsp msg;
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
46
struct tmq_list_t {
L
Liu Jicong 已提交
47
  SArray container;
L
Liu Jicong 已提交
48
};
L
Liu Jicong 已提交
49

L
Liu Jicong 已提交
50
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
51
  SMqOffset offset;
L
Liu Jicong 已提交
52 53 54
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
55 56
  int32_t             cnt;
  int32_t             size;
L
Liu Jicong 已提交
57 58 59 60
  tmq_topic_vgroup_t* elems;
};

struct tmq_conf_t {
L
Liu Jicong 已提交
61
  char           clientId[256];
L
Liu Jicong 已提交
62
  char           groupId[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
63
  int8_t         autoCommit;
L
Liu Jicong 已提交
64
  int8_t         resetOffset;
L
Liu Jicong 已提交
65 66 67 68 69
  uint16_t       port;
  char*          ip;
  char*          user;
  char*          pass;
  char*          db;
L
Liu Jicong 已提交
70
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
L
Liu Jicong 已提交
74
  // conf
L
Liu Jicong 已提交
75
  char           groupId[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
76
  char           clientId[256];
L
Liu Jicong 已提交
77
  int8_t         autoCommit;
L
Liu Jicong 已提交
78
  int8_t         inWaiting;
L
Liu Jicong 已提交
79
  int64_t        consumerId;
L
Liu Jicong 已提交
80
  int32_t        epoch;
L
Liu Jicong 已提交
81
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
82 83 84 85
  int64_t        status;
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  int32_t        nextTopicIdx;
L
fix txn  
Liu Jicong 已提交
86
  int8_t         epStatus;
L
temp  
Liu Jicong 已提交
87
  int32_t        epSkipCnt;
L
fix  
Liu Jicong 已提交
88
  int32_t        waitingRequest;
L
Liu Jicong 已提交
89
  int32_t        readyRequest;
L
Liu Jicong 已提交
90
  SArray*        clientTopics;  // SArray<SMqClientTopic>
X
Xiaoyu Wang 已提交
91 92
  STaosQueue*    mqueue;        // queue of tmq_message_t
  STaosQall*     qall;
L
Liu Jicong 已提交
93
  tsem_t         rspSem;
L
Liu Jicong 已提交
94 95
  // stat
  int64_t pollCnt;
L
Liu Jicong 已提交
96 97
};

X
Xiaoyu Wang 已提交
98 99 100 101 102 103 104 105
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
L
Liu Jicong 已提交
106 107
};

L
Liu Jicong 已提交
108
typedef struct {
109 110 111 112
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
113
  // connection info
114
  int32_t vgId;
X
Xiaoyu Wang 已提交
115
  int32_t vgStatus;
L
Liu Jicong 已提交
116
  int32_t vgSkipCnt;
117 118 119
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
120
typedef struct {
121
  // subscribe info
L
Liu Jicong 已提交
122 123 124 125 126 127 128 129
  int32_t     sqlLen;
  char*       sql;
  char*       topicName;
  int64_t     topicId;
  SArray*     vgs;  // SArray<SMqClientVg>
  int8_t      isSchemaAdaptive;
  int32_t     numOfFields;
  TAOS_FIELD* fields;
130 131
} SMqClientTopic;

L
Liu Jicong 已提交
132 133 134 135 136 137 138 139
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
  SMqPollRspV2    msg;
} SMqPollRspWrapper;

L
Liu Jicong 已提交
140
typedef struct {
L
Liu Jicong 已提交
141 142 143 144
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
typedef struct {
147
  tmq_t*  tmq;
X
Xiaoyu Wang 已提交
148 149
  int32_t sync;
  tsem_t  rspSem;
150 151
} SMqAskEpCbParam;

L
Liu Jicong 已提交
152
typedef struct {
L
Liu Jicong 已提交
153 154
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
155
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
156
  int32_t         epoch;
L
Liu Jicong 已提交
157
  int32_t         vgId;
L
Liu Jicong 已提交
158 159
  tsem_t          rspSem;
  int32_t         sync;
X
Xiaoyu Wang 已提交
160
} SMqPollCbParam;
161

L
Liu Jicong 已提交
162
typedef struct {
L
Liu Jicong 已提交
163
  tmq_t*         tmq;
L
Liu Jicong 已提交
164
  int32_t        async;
L
Liu Jicong 已提交
165 166
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
167
  /*SMqClientVg* pVg;*/
L
Liu Jicong 已提交
168
} SMqCommitCbParam;
L
Liu Jicong 已提交
169

170
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
171
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
L
Liu Jicong 已提交
172
  conf->autoCommit = false;
X
Xiaoyu Wang 已提交
173
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
174 175 176
  return conf;
}

L
Liu Jicong 已提交
177
void tmq_conf_destroy(tmq_conf_t* conf) {
wafwerar's avatar
wafwerar 已提交
178
  if (conf) taosMemoryFree(conf);
L
Liu Jicong 已提交
179 180 181
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
182 183
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
184
    return TMQ_CONF_OK;
185
  }
L
Liu Jicong 已提交
186

187 188
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
189 190
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
191

L
Liu Jicong 已提交
192 193
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
194
      conf->autoCommit = true;
L
Liu Jicong 已提交
195 196
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
197
      conf->autoCommit = false;
L
Liu Jicong 已提交
198 199 200 201
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
202
  }
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
218

L
Liu Jicong 已提交
219
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
220 221 222
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
223
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
224 225 226
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
227
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
228 229 230
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
231
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
232 233 234
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
235
  if (strcmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
236 237 238 239
    conf->db = strdup(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
240
  return TMQ_CONF_UNKNOWN;
241 242 243
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
244 245
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
246 247
}

L
Liu Jicong 已提交
248 249 250
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
251
  if (taosArrayPush(container, &topic) == NULL) return -1;
252 253 254
  return 0;
}

L
Liu Jicong 已提交
255
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
256
  SArray* container = &list->container;
L
fix  
Liu Jicong 已提交
257 258
  /*taosArrayDestroy(container);*/
  taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
L
Liu Jicong 已提交
259 260
}

L
Liu Jicong 已提交
261 262 263 264
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
265
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
266
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
267 268 269 270 271 272 273 274
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
275
  msg = NULL;
L
Liu Jicong 已提交
276 277 278 279 280 281 282 283 284 285
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
286 287 288 289 290 291
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
292

L
Liu Jicong 已提交
293
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
294
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
295
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
L
Liu Jicong 已提交
296
  if (pParam->tmq->commit_cb) {
L
Liu Jicong 已提交
297
    pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
L
Liu Jicong 已提交
298
  }
L
fix  
Liu Jicong 已提交
299
  if (!pParam->async) tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
300 301 302
  return 0;
}

X
Xiaoyu Wang 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
    tmq_list_append(*topics, strdup(topic->topicName));
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  return tmq_subscribe(tmq, lst);
}

319
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
320
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
321 322 323 324
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
L
Liu Jicong 已提交
325
  pTmq->inWaiting = 0;
326 327
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
328
  pTmq->epoch = 0;
L
fix  
Liu Jicong 已提交
329
  pTmq->waitingRequest = 0;
L
Liu Jicong 已提交
330
  pTmq->readyRequest = 0;
L
fix txn  
Liu Jicong 已提交
331
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
332
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
333
  // set conf
334 335
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
336
  pTmq->autoCommit = conf->autoCommit;
337
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
338
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
339

L
Liu Jicong 已提交
340 341 342 343 344 345 346 347 348 349
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
350 351
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367
  return pTmq;
}

tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = taos_connect(conf->ip, conf->user, conf->pass, conf->db, conf->port);

  pTmq->inWaiting = 0;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
  pTmq->waitingRequest = 0;
  pTmq->readyRequest = 0;
L
temp  
Liu Jicong 已提交
368 369
  pTmq->epStatus = 0;
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
370 371 372 373 374 375 376
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
  pTmq->autoCommit = conf->autoCommit;
  pTmq->commit_cb = conf->commit_cb;
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
377
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
378
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
L
Liu Jicong 已提交
379 380 381 382
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }
X
Xiaoyu Wang 已提交
383 384 385

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
386 387 388

  tsem_init(&pTmq->rspSem, 0, 0);

389 390 391
  return pTmq;
}

L
Liu Jicong 已提交
392 393 394 395
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
396 397
  // build msg
  // send to mnode
L
Liu Jicong 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420
  SMqCMCommitOffsetReq req;
  SArray*              pArray = NULL;

  if (offsets == NULL) {
    pArray = taosArrayInit(0, sizeof(SMqOffset));
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pArray, &offset);
      }
    }
    req.num = pArray->size;
    req.offsets = pArray->pData;
  } else {
    req.num = offsets->cnt;
    req.offsets = (SMqOffset*)offsets->elems;
  }
L
Liu Jicong 已提交
421 422 423 424

  SCoder encoder;

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
L
Liu Jicong 已提交
425
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
426
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
427
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
428 429 430 431 432 433 434
  if (buf == NULL) {
    tCoderClear(&encoder);
    return -1;
  }
  tCoderClear(&encoder);

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
L
Liu Jicong 已提交
435
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
436 437
  tCoderClear(&encoder);

L
Liu Jicong 已提交
438
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
439 440 441 442
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

wafwerar's avatar
wafwerar 已提交
443
  SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
444 445 446 447 448
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
449
  pParam->async = async;
L
Liu Jicong 已提交
450

X
Xiaoyu Wang 已提交
451 452 453 454 455
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
456 457

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
458 459
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
460 461 462 463 464
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
465 466 467 468
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
  }
L
Liu Jicong 已提交
469

L
fix  
Liu Jicong 已提交
470
  tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
471
  taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
472

L
Liu Jicong 已提交
473 474 475 476 477
  if (pArray) {
    taosArrayDestroy(pArray);
  }

  return resp;
L
Liu Jicong 已提交
478 479
}

L
Liu Jicong 已提交
480
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
L
Liu Jicong 已提交
481
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
482 483
  SArray*      container = &topic_list->container;
  int32_t      sz = taosArrayGetSize(container);
L
Liu Jicong 已提交
484
  // destroy ex
485 486 487 488 489 490 491 492 493 494
  taosArrayDestroy(tmq->clientTopics);
  tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));

  SCMSubscribeReq req;
  req.topicNum = sz;
  req.consumerId = tmq->consumerId;
  req.consumerGroup = strdup(tmq->groupId);
  req.topicNames = taosArrayInit(sz, sizeof(void*));

  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
495 496
    /*char* topicName = topic_list->elems[i];*/
    char* topicName = taosArrayGetP(container, i);
497 498 499

    SName name = {0};
    char* dbName = getDbOfConnection(tmq->pTscObj);
L
Liu Jicong 已提交
500 501 502
    if (dbName == NULL) {
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
503
    tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
504 505
    tNameFromString(&name, topicName, T_NAME_TABLE);

wafwerar's avatar
wafwerar 已提交
506
    char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
507
    if (topicFname == NULL) {
L
Liu Jicong 已提交
508
      goto _return;
509 510 511 512
    }
    tNameExtractFullName(&name, topicFname);
    tscDebug("subscribe topic: %s", topicFname);
    SMqClientTopic topic = {
L
Liu Jicong 已提交
513 514 515 516 517 518
        .sql = NULL,
        .sqlLen = 0,
        .topicId = 0,
        .topicName = topicFname,
        .vgs = NULL,
    };
519
    topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
L
Liu Jicong 已提交
520
    taosArrayPush(tmq->clientTopics, &topic);
521
    taosArrayPush(req.topicNames, &topicFname);
wafwerar's avatar
wafwerar 已提交
522
    taosMemoryFree(dbName);
523 524
  }

L
Liu Jicong 已提交
525
  int   tlen = tSerializeSCMSubscribeReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
526
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
527
  if (buf == NULL) {
528 529 530 531 532 533 534 535 536
    goto _return;
  }

  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
  if (pRequest == NULL) {
L
Liu Jicong 已提交
537
    tscError("failed to malloc request");
538 539
  }

X
Xiaoyu Wang 已提交
540 541 542 543
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
544 545
  tsem_init(&param.rspSem, 0, 0);

X
Xiaoyu Wang 已提交
546 547 548 549 550
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
551 552

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
553 554
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
555 556 557 558 559
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
560 561
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
562 563 564

_return:
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
565
  /*destroySendMsgInfo(sendInfo);*/
566 567
  /*}*/

L
Liu Jicong 已提交
568
  return param.rspErr;
569 570
}

L
Liu Jicong 已提交
571
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
572

L
Liu Jicong 已提交
573 574 575 576 577 578 579 580 581 582 583 584 585 586 587
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
588 589
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
590 591 592 593 594 595 596 597 598 599 600 601
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
602
  int32_t code = 0;
L
Liu Jicong 已提交
603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);

  // todo check for invalid sql statement and return with error code

  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
622
  strcpy(req.outputSTbName, tbName);
L
Liu Jicong 已提交
623 624

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
625
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
649
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
650 651 652 653 654 655 656 657 658 659 660 661
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}

L
Liu Jicong 已提交
662
#if 0
L
Liu Jicong 已提交
663 664 665
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
666
  SQuery*      pQueryNode = NULL;
L
Liu Jicong 已提交
667
  char*        astStr = NULL;
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

L
Liu Jicong 已提交
688
  tscDebug("start to create topic: %s", topicName);
689

X
Xiaoyu Wang 已提交
690
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
691 692
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
693 694 695

  // todo check for invalid sql statement and return with error code

L
Liu Jicong 已提交
696
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
697

L
Liu Jicong 已提交
698
  /*printf("%s\n", pStr);*/
699

L
Liu Jicong 已提交
700
  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
X
Xiaoyu Wang 已提交
701 702
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, topicName);
703

L
Liu Jicong 已提交
704
  SCMCreateTopicReq req = {
L
Liu Jicong 已提交
705
      .igExists = 1,
L
Liu Jicong 已提交
706
      .ast = astStr,
L
Liu Jicong 已提交
707
      .sql = (char*)sql,
708
  };
L
Liu Jicong 已提交
709
  tNameExtractFullName(&name, req.name);
710

L
Liu Jicong 已提交
711
  int   tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
712
  void* buf = taosMemoryMalloc(tlen);
713 714 715 716
  if (buf == NULL) {
    goto _return;
  }

L
Liu Jicong 已提交
717
  tSerializeSCMCreateTopicReq(buf, tlen, &req);
718 719
  /*printf("formatted: %s\n", dagStr);*/

L
Liu Jicong 已提交
720 721 722 723 724
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
725 726 727
  pRequest->type = TDMT_MND_CREATE_TOPIC;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
728
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
729 730 731 732 733

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
734

735
_return:
wafwerar's avatar
wafwerar 已提交
736
  taosMemoryFreeClear(astStr);
737 738
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
739
  /*destroySendMsgInfo(sendInfo);*/
740 741 742 743 744 745 746 747
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
L
Liu Jicong 已提交
748
#endif
749

L
Liu Jicong 已提交
750
static char* formatTimestamp(char* buf, int64_t val, int precision) {
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785
  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /* comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
    */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

L
Liu Jicong 已提交
786
  struct tm* ptm = localtime(&tt);
787 788 789 790 791 792 793 794 795 796 797 798
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}
L
Liu Jicong 已提交
799
#if 0
L
Liu Jicong 已提交
800 801
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
802
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
803 804 805
  return pRsp->skipLogNum;
}

L
Liu Jicong 已提交
806 807 808
void tmqShowMsg(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;

L
Liu Jicong 已提交
809 810
  static bool noPrintSchema;
  char        pBuf[128];
L
Liu Jicong 已提交
811
  SMqPollRsp* pRsp = &tmq_message->msg;
L
fix  
Liu Jicong 已提交
812
  int32_t     colNum = 2;
L
Liu Jicong 已提交
813 814 815 816
  if (!noPrintSchema) {
    printf("|");
    for (int32_t i = 0; i < colNum; i++) {
      if (i == 0)
L
Liu Jicong 已提交
817
        printf(" %25s |", pRsp->schema->pSchema[i].name);
L
Liu Jicong 已提交
818
      else
L
Liu Jicong 已提交
819
        printf(" %15s |", pRsp->schema->pSchema[i].name);
L
Liu Jicong 已提交
820 821 822 823
    }
    printf("\n");
    printf("===============================================\n");
    noPrintSchema = true;
824
  }
L
Liu Jicong 已提交
825
  int32_t sz = taosArrayGetSize(pRsp->pBlockData);
826
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
827 828
    SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i);
    int32_t      rows = pDataBlock->info.rows;
829 830 831 832
    for (int32_t j = 0; j < rows; j++) {
      printf("|");
      for (int32_t k = 0; k < colNum; k++) {
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
L
Liu Jicong 已提交
833 834
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
        switch (pColInfoData->info.type) {
835 836 837 838 839 840 841 842 843 844 845 846 847
          case TSDB_DATA_TYPE_TIMESTAMP:
            formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
            printf(" %25s |", pBuf);
            break;
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t*)var);
            break;
        }
      }
      printf("\n");
    }
  }
L
Liu Jicong 已提交
848
}
L
Liu Jicong 已提交
849
#endif
L
Liu Jicong 已提交
850 851

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
L
fix  
Liu Jicong 已提交
852
  /*printf("recv poll\n");*/
X
Xiaoyu Wang 已提交
853 854
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
855
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
856
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
857
  if (code != 0) {
L
Liu Jicong 已提交
858
    tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
L
fix txn  
Liu Jicong 已提交
859
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
860 861
  }

X
Xiaoyu Wang 已提交
862 863 864
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
fix  
Liu Jicong 已提交
865
    /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
L
temp  
Liu Jicong 已提交
866
    /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
867 868
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
            tmqEpoch);
X
Xiaoyu Wang 已提交
869 870 871 872
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
873
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
L
fix  
Liu Jicong 已提交
874 875
  } else {
    atomic_sub_fetch_32(&tmq->waitingRequest, 1);
X
Xiaoyu Wang 已提交
876 877
  }

L
Liu Jicong 已提交
878
#if 0
L
Liu Jicong 已提交
879
  if (pParam->sync == 1) {
wafwerar's avatar
wafwerar 已提交
880
    /**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/
L
Liu Jicong 已提交
881 882 883 884 885 886 887 888 889 890 891 892 893 894
    *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
    if (*pParam->msg) {
      memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead));
      tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp));
      if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
        pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
      }
      taosWriteQitem(tmq->mqueue, *pParam->msg);
      tsem_post(&pParam->rspSem);
      return 0;
    }
    tsem_post(&pParam->rspSem);
    return -1;
  }
L
Liu Jicong 已提交
895
#endif
L
Liu Jicong 已提交
896

wafwerar's avatar
wafwerar 已提交
897
  /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
L
Liu Jicong 已提交
898 899 900
  /*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
901
    tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
L
fix txn  
Liu Jicong 已提交
902
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
903
  }
L
Liu Jicong 已提交
904 905 906 907 908 909
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
  /*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
  tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
910 911
  // TODO: alloc mem
  /*pRsp->*/
912
  /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
L
Liu Jicong 已提交
913

L
temp  
Liu Jicong 已提交
914
#if 0
L
Liu Jicong 已提交
915
  if (pRsp->msg.numOfTopics == 0) {
L
fix  
Liu Jicong 已提交
916
    /*printf("no data\n");*/
X
Xiaoyu Wang 已提交
917
    taosFreeQitem(pRsp);
L
fix txn  
Liu Jicong 已提交
918
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
919
  }
L
temp  
Liu Jicong 已提交
920
#endif
L
Liu Jicong 已提交
921

L
Liu Jicong 已提交
922 923
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
924

L
Liu Jicong 已提交
925
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
926
  atomic_add_fetch_32(&tmq->readyRequest, 1);
L
temp  
Liu Jicong 已提交
927
  /*tsem_post(&tmq->rspSem);*/
928
  return 0;
L
Liu Jicong 已提交
929

L
fix txn  
Liu Jicong 已提交
930
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
931 932 933
  if (pParam->epoch == tmq->epoch) {
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
L
temp  
Liu Jicong 已提交
934
  /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
935
  return code;
936 937
}

X
Xiaoyu Wang 已提交
938
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
L
Liu Jicong 已提交
939
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
940
  bool    set = false;
L
Liu Jicong 已提交
941 942
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
943 944
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
945 946 947 948 949 950 951 952 953 954 955 956
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
957 958
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
959
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
960
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
961

L
Liu Jicong 已提交
962
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
963 964 965 966 967 968
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
969
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
970 971 972 973
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
974
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
975 976 977 978 979 980 981 982 983
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
984
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
985 986 987
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
988
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
989 990
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
991
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
992
      }
L
Liu Jicong 已提交
993
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
994 995
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
996
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
997 998 999
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1000
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1001 1002 1003 1004
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1005
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1006
  }
L
Liu Jicong 已提交
1007
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1008
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1009
  tmq->clientTopics = newTopics;
X
Xiaoyu Wang 已提交
1010 1011 1012 1013
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1014
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1015
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1016
  tmq_t*           tmq = pParam->tmq;
1017
  if (code != 0) {
L
temp  
Liu Jicong 已提交
1018
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
L
Liu Jicong 已提交
1019
    goto END;
1020
  }
L
Liu Jicong 已提交
1021

L
Liu Jicong 已提交
1022
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1023
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1024
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1025 1026
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1027
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1028 1029
  if (head->epoch <= epoch) {
    goto END;
1030
  }
L
Liu Jicong 已提交
1031

X
Xiaoyu Wang 已提交
1032 1033 1034 1035 1036
  if (pParam->sync) {
    SMqCMGetSubEpRsp rsp;
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1037
    if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
X
Xiaoyu Wang 已提交
1038
      atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1039
    }
X
Xiaoyu Wang 已提交
1040 1041
    tDeleteSMqCMGetSubEpRsp(&rsp);
  } else {
L
Liu Jicong 已提交
1042 1043 1044
    /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1045
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1046 1047
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1048
    }
L
Liu Jicong 已提交
1049 1050 1051 1052
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1053

L
Liu Jicong 已提交
1054
    taosWriteQitem(tmq->mqueue, pWrapper);
L
temp  
Liu Jicong 已提交
1055
    /*tsem_post(&tmq->rspSem);*/
1056
  }
L
Liu Jicong 已提交
1057 1058

END:
L
fix txn  
Liu Jicong 已提交
1059
  atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1060 1061 1062 1063
  if (pParam->sync) {
    tsem_post(&pParam->rspSem);
  }
  return code;
1064 1065
}

X
Xiaoyu Wang 已提交
1066
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
L
fix txn  
Liu Jicong 已提交
1067 1068
  int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1069
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1070
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1071
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1072
  }
L
temp  
Liu Jicong 已提交
1073
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1074
  int32_t           tlen = sizeof(SMqCMGetSubEpReq);
wafwerar's avatar
wafwerar 已提交
1075
  SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
1076
  if (req == NULL) {
L
Liu Jicong 已提交
1077
    tscError("failed to malloc get subscribe ep buf");
L
add log  
Liu Jicong 已提交
1078
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1079
    return -1;
L
Liu Jicong 已提交
1080
  }
L
Liu Jicong 已提交
1081 1082 1083
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1084

wafwerar's avatar
wafwerar 已提交
1085
  SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1086 1087
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1088
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1089
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1090
    return -1;
L
Liu Jicong 已提交
1091 1092
  }
  pParam->tmq = tmq;
X
Xiaoyu Wang 已提交
1093 1094
  pParam->sync = sync;
  tsem_init(&pParam->rspSem, 0, 0);
1095

wafwerar's avatar
wafwerar 已提交
1096
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1097 1098
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1099 1100
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1101
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1102 1103 1104 1105 1106 1107 1108 1109 1110 1111
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1112 1113 1114
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1115
  sendInfo->msgType = TDMT_MND_GET_SUB_EP;
1116

L
Liu Jicong 已提交
1117 1118
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1119 1120
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1121 1122
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1123

X
Xiaoyu Wang 已提交
1124
  if (sync) tsem_wait(&pParam->rspSem);
L
Liu Jicong 已提交
1125
  return 0;
1126 1127
}

L
Liu Jicong 已提交
1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1142
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1143 1144 1145 1146 1147 1148 1149 1150
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

L
Liu Jicong 已提交
1151
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

wafwerar's avatar
wafwerar 已提交
1163
  SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
1164 1165 1166
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1167

L
Liu Jicong 已提交
1168
  strcpy(pReq->topic, pTopic->topicName);
1169 1170
  strcpy(pReq->cgroup, tmq->groupId);

L
fix  
Liu Jicong 已提交
1171
  pReq->blockingTime = blockingTime;
L
Liu Jicong 已提交
1172
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1173
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1174
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1175
  pReq->reqId = generateRequestId();
1176 1177

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1178
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1179 1180 1181
  return pReq;
}

L
Liu Jicong 已提交
1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
  pRspObj->topic = strdup(pWrapper->topicHandle->topicName);
  pRspObj->resIter = -1;
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  SMqPollRspV2* pRsp = &pWrapper->msg;
  int32_t       blockNum = taosArrayGetSize(pRsp->blockPos);
  pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo));
  for (int32_t i = 0; i < blockNum; i++) {
    int32_t            pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
    SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
    SReqResultInfo     resInfo;
    setQueryResultFromRsp(&resInfo, pRetrieve, true);
    taosArrayPush(pRspObj->res, &resInfo);
  }
  return pRspObj;
}

L
Liu Jicong 已提交
1201
#if 0
L
Liu Jicong 已提交
1202 1203 1204 1205 1206 1207 1208 1209 1210 1211
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
  tmq_message_t* msg = NULL;
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
      /*continue;*/
      /*}*/
L
Liu Jicong 已提交
1212
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
L
Liu Jicong 已提交
1213 1214 1215 1216 1217
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // TODO: out of mem
        return NULL;
      }
X
Xiaoyu Wang 已提交
1218

wafwerar's avatar
wafwerar 已提交
1219
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1220
      if (pParam == NULL) {
L
Liu Jicong 已提交
1221 1222 1223 1224
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // TODO: out of mem
        return NULL;
      }
L
Liu Jicong 已提交
1225 1226 1227 1228 1229 1230 1231
      pParam->tmq = tmq;
      pParam->pVg = pVg;
      pParam->epoch = tmq->epoch;
      pParam->sync = 1;
      pParam->msg = &msg;
      tsem_init(&pParam->rspSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1232
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1233 1234 1235 1236 1237
      if (sendInfo == NULL) {
        return NULL;
      }

      sendInfo->msgInfo = (SDataBuf){
L
Liu Jicong 已提交
1238
          .pData = pReq,
L
Liu Jicong 已提交
1239
          .len = sizeof(SMqPollReq),
L
Liu Jicong 已提交
1240 1241
          .handle = NULL,
      };
L
Liu Jicong 已提交
1242
      sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1243
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1244
      sendInfo->param = pParam;
L
Liu Jicong 已提交
1245
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1246
      sendInfo->msgType = TDMT_VND_CONSUME;
L
Liu Jicong 已提交
1247 1248 1249 1250 1251 1252 1253 1254

      int64_t transporterId = 0;
      /*printf("send poll\n");*/
      atomic_add_fetch_32(&tmq->waitingRequest, 1);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;

L
Liu Jicong 已提交
1255
      tsem_wait(&pParam->rspSem);
L
Liu Jicong 已提交
1256 1257 1258 1259
      tmq_message_t* nmsg = NULL;
      while (1) {
        taosReadQitem(tmq->mqueue, (void**)&nmsg);
        if (nmsg == NULL) continue;
L
Liu Jicong 已提交
1260 1261 1262
        while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
          taosReadQitem(tmq->mqueue, (void**)&nmsg);
        }
L
Liu Jicong 已提交
1263 1264 1265
        return nmsg;
      }
    }
X
Xiaoyu Wang 已提交
1266
  }
L
Liu Jicong 已提交
1267
  return NULL;
X
Xiaoyu Wang 已提交
1268
}
L
Liu Jicong 已提交
1269
#endif
X
Xiaoyu Wang 已提交
1270 1271

int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
L
fix  
Liu Jicong 已提交
1272
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1273 1274 1275 1276 1277 1278
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1279
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1280
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1281
        continue;
L
Liu Jicong 已提交
1282
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1283 1284 1285 1286 1287 1288 1289
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1290
      }
L
Liu Jicong 已提交
1291
      atomic_store_32(&pVg->vgSkipCnt, 0);
L
Liu Jicong 已提交
1292
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
X
Xiaoyu Wang 已提交
1293 1294
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1295
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1296 1297
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1298
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1299
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1300
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1301
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1302
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1303 1304
        return -1;
      }
L
Liu Jicong 已提交
1305 1306
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1307
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1308
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1309 1310 1311
      pParam->epoch = tmq->epoch;
      pParam->sync = 0;

wafwerar's avatar
wafwerar 已提交
1312
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1313
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1314 1315
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1316
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1317
        /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
1318 1319 1320 1321
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1322
          .pData = pReq,
L
Liu Jicong 已提交
1323
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1324 1325
          .handle = NULL,
      };
L
Liu Jicong 已提交
1326
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1327
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1328
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1329
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1330
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1331 1332

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1333 1334
      /*printf("send poll\n");*/
      atomic_add_fetch_32(&tmq->waitingRequest, 1);
L
Liu Jicong 已提交
1335 1336
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1337
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1338 1339 1340 1341 1342 1343 1344 1345
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1346 1347
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1348
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1349 1350 1351 1352
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
      SMqCMGetSubEpRsp*   rspMsg = &pEpRspWrapper->msg;
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1353
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1354 1355 1356 1357 1358 1359 1360 1361 1362 1363
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1364
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1365
  while (1) {
L
Liu Jicong 已提交
1366 1367 1368
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1369
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1370 1371
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1372 1373
    }

L
Liu Jicong 已提交
1374 1375
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1376
      atomic_sub_fetch_32(&tmq->readyRequest, 1);
L
fix  
Liu Jicong 已提交
1377
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1378
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1379
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1380
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1381
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1382
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1383
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1384 1385 1386
        if (pollRspWrapper->msg.dataLen == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1387 1388
          continue;
        }
L
Liu Jicong 已提交
1389 1390 1391
        // build msg
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        return pRsp;
X
Xiaoyu Wang 已提交
1392
      } else {
L
Liu Jicong 已提交
1393
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1394
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1395 1396
      }
    } else {
L
fix  
Liu Jicong 已提交
1397
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1398
      bool reset = false;
L
Liu Jicong 已提交
1399 1400
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1401
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1402
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
X
Xiaoyu Wang 已提交
1403 1404 1405 1406 1407 1408
        tmqPollImpl(tmq, blockingTime);
      }
    }
  }
}

L
Liu Jicong 已提交
1409
#if 0
L
Liu Jicong 已提交
1410
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
X
Xiaoyu Wang 已提交
1411 1412
  tmq_message_t* rspMsg = NULL;
  int64_t        startTime = taosGetTimestampMs();
1413 1414

  int64_t status = atomic_load_64(&tmq->status);
X
Xiaoyu Wang 已提交
1415 1416
  tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);

L
Liu Jicong 已提交
1417 1418 1419 1420 1421 1422 1423 1424 1425 1426 1427 1428 1429
  while (1) {
    rspMsg = tmqSyncPollImpl(tmq, blocking_time);
    if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
      return rspMsg;
    }

    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
        return NULL;
      }
    } else
      return NULL;
X
Xiaoyu Wang 已提交
1430
  }
L
Liu Jicong 已提交
1431
}
L
Liu Jicong 已提交
1432
#endif
X
Xiaoyu Wang 已提交
1433

L
Liu Jicong 已提交
1434 1435 1436
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1437

L
Liu Jicong 已提交
1438
  // TODO: put into another thread or delayed queue
1439
  int64_t status = atomic_load_64(&tmq->status);
L
Liu Jicong 已提交
1440 1441
  tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);

L
Liu Jicong 已提交
1442 1443 1444
  rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1445
  }
X
Xiaoyu Wang 已提交
1446 1447 1448

  while (1) {
    /*printf("cycle\n");*/
L
Liu Jicong 已提交
1449
    tmqAskEp(tmq, false);
L
Liu Jicong 已提交
1450
    tmqPollImpl(tmq, blocking_time);
L
Liu Jicong 已提交
1451

L
temp  
Liu Jicong 已提交
1452
    /*tsem_wait(&tmq->rspSem);*/
L
Liu Jicong 已提交
1453

L
Liu Jicong 已提交
1454 1455 1456
    rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1457 1458 1459 1460
    }
    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
L
Liu Jicong 已提交
1461
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1462 1463 1464 1465 1466 1467 1468
        return NULL;
      }
    }
  }
}

#if 0
1469

L
Liu Jicong 已提交
1470
  if (blocking_time <= 0) blocking_time = 1;
L
Liu Jicong 已提交
1471 1472
  if (blocking_time > 1000) blocking_time = 1000;
  /*blocking_time = 1;*/
1473 1474 1475

  if (taosArrayGetSize(tmq->clientTopics) == 0) {
    tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
L
Liu Jicong 已提交
1476
    /*printf("over1\n");*/
wafwerar's avatar
wafwerar 已提交
1477
    taosMsleep(blocking_time);
1478 1479 1480 1481
    return NULL;
  }
  SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
  if (taosArrayGetSize(pTopic->vgs) == 0) {
L
Liu Jicong 已提交
1482
    /*printf("over2\n");*/
wafwerar's avatar
wafwerar 已提交
1483
    taosMsleep(blocking_time);
1484 1485 1486
    return NULL;
  }

L
Liu Jicong 已提交
1487
  tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
L
Liu Jicong 已提交
1488
  int32_t beginVgIdx = pTopic->nextVgIdx;
L
Liu Jicong 已提交
1489
  while (1) {
L
Liu Jicong 已提交
1490 1491 1492
    pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
    /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
L
Liu Jicong 已提交
1493
    SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
L
Liu Jicong 已提交
1494 1495
    if (pReq == NULL) {
      ASSERT(false);
wafwerar's avatar
wafwerar 已提交
1496
      taosMsleep(blocking_time);
L
Liu Jicong 已提交
1497 1498
      return NULL;
    }
1499

wafwerar's avatar
wafwerar 已提交
1500
    SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1501 1502
    if (param == NULL) {
      ASSERT(false);
wafwerar's avatar
wafwerar 已提交
1503
      taosMsleep(blocking_time);
L
Liu Jicong 已提交
1504 1505 1506 1507 1508 1509 1510 1511
      return NULL;
    }
    param->tmq = tmq;
    param->retMsg = &tmq_message;
    param->pVg = pVg;
    tsem_init(&param->rspSem, 0, 0);

    SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
X
Xiaoyu Wang 已提交
1512 1513 1514 1515 1516
    pRequest->body.requestMsg = (SDataBuf){
        .pData = pReq,
        .len = sizeof(SMqConsumeReq),
        .handle = NULL,
    };
1517

L
Liu Jicong 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526 1527 1528 1529 1530
    SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
    sendInfo->requestObjRefId = 0;
    sendInfo->param = param;
    sendInfo->fp = tmqPollCb;

    /*printf("req offset: %ld\n", pReq->offset);*/

    int64_t transporterId = 0;
    asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
    tmq->pollCnt++;

    tsem_wait(&param->rspSem);
    tsem_destroy(&param->rspSem);
wafwerar's avatar
wafwerar 已提交
1531
    taosMemoryFree(param);
L
Liu Jicong 已提交
1532 1533 1534

    if (tmq_message == NULL) {
      if (beginVgIdx == pTopic->nextVgIdx) {
wafwerar's avatar
wafwerar 已提交
1535
        taosMsleep(blocking_time);
L
Liu Jicong 已提交
1536 1537 1538 1539
      } else {
        continue;
      }
    }
L
Liu Jicong 已提交
1540

L
Liu Jicong 已提交
1541
    return tmq_message;
L
Liu Jicong 已提交
1542
  }
1543 1544 1545 1546

  /*tsem_wait(&pRequest->body.rspSem);*/

  /*if (body != NULL) {*/
L
Liu Jicong 已提交
1547
  /*destroySendMsgInfo(body);*/
1548 1549 1550
  /*}*/

  /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
L
Liu Jicong 已提交
1551
  /*pRequest->code = terrno;*/
1552 1553 1554 1555
  /*}*/

  /*return pRequest;*/
}
X
Xiaoyu Wang 已提交
1556
#endif
1557

L
Liu Jicong 已提交
1558
#if 0
L
Liu Jicong 已提交
1559
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
L
Liu Jicong 已提交
1560
  if (tmq_topic_vgroup_list != NULL) {
L
Liu Jicong 已提交
1561
    // TODO
L
Liu Jicong 已提交
1562 1563
  }

L
Liu Jicong 已提交
1564
  // TODO: change semaphore to gate
L
Liu Jicong 已提交
1565 1566 1567
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
1568
      SMqClientVg*   pVg = taosArrayGet(pTopic->vgs, j);
L
Liu Jicong 已提交
1569
      SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
L
Liu Jicong 已提交
1570

L
Liu Jicong 已提交
1571 1572
      SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
      pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
wafwerar's avatar
wafwerar 已提交
1573
      SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588 1589 1590 1591 1592 1593
      if (pParam == NULL) {
        continue;
      }
      pParam->tmq = tmq;
      pParam->pVg = pVg;
      pParam->async = async;
      if (!async) tsem_init(&pParam->rspSem, 0, 0);

      SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqCommitCb;

      int64_t transporterId = 0;
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

      if (!async) tsem_wait(&pParam->rspSem);
    }
  }

L
Liu Jicong 已提交
1594
  return 0;
1595
}
L
Liu Jicong 已提交
1596
#endif
1597

L
Liu Jicong 已提交
1598
#if 0
1599 1600
void tmq_message_destroy(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;
L
Liu Jicong 已提交
1601
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1602
  tDeleteSMqConsumeRsp(pRsp);
wafwerar's avatar
wafwerar 已提交
1603
  /*taosMemoryFree(tmq_message);*/
X
Xiaoyu Wang 已提交
1604
  taosFreeQitem(tmq_message);
1605
}
L
Liu Jicong 已提交
1606
#endif
1607

L
Liu Jicong 已提交
1608
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
L
Liu Jicong 已提交
1609 1610 1611 1612 1613 1614 1615

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1616

L
Liu Jicong 已提交
1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640
char* tmq_get_topic_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->topic;
  } else {
    return NULL;
  }
}

int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}

void tmq_message_destroy(TAOS_RES* res) {
  if (res == NULL) return;
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
  }
}