tmq.c 48.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21 22 23
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
24
#include "tqueue.h"
25 26
#include "tref.h"

L
Liu Jicong 已提交
27 28 29 30
struct tmq_message_t {
  SMqPollRsp msg;
  char*      topic;
  SArray*    res;  // SArray<SReqResultInfo>
L
Liu Jicong 已提交
31
  int32_t    vgId;
L
Liu Jicong 已提交
32 33 34
  int32_t    resIter;
};

L
Liu Jicong 已提交
35 36 37 38 39 40 41 42 43 44 45
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
  int8_t           tmqRspType;
  int32_t          epoch;
  SMqCMGetSubEpRsp msg;
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
46
struct tmq_list_t {
L
Liu Jicong 已提交
47
  SArray container;
L
Liu Jicong 已提交
48
};
L
Liu Jicong 已提交
49

L
Liu Jicong 已提交
50
struct tmq_topic_vgroup_t {
L
Liu Jicong 已提交
51
  SMqOffset offset;
L
Liu Jicong 已提交
52 53 54
};

struct tmq_topic_vgroup_list_t {
L
Liu Jicong 已提交
55 56
  int32_t             cnt;
  int32_t             size;
L
Liu Jicong 已提交
57 58 59 60
  tmq_topic_vgroup_t* elems;
};

struct tmq_conf_t {
L
Liu Jicong 已提交
61
  char           clientId[256];
L
Liu Jicong 已提交
62
  char           groupId[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
63
  int8_t         autoCommit;
L
Liu Jicong 已提交
64
  int8_t         resetOffset;
L
Liu Jicong 已提交
65 66 67 68 69
  uint16_t       port;
  char*          ip;
  char*          user;
  char*          pass;
  char*          db;
L
Liu Jicong 已提交
70
  tmq_commit_cb* commit_cb;
L
Liu Jicong 已提交
71 72 73
};

struct tmq_t {
L
Liu Jicong 已提交
74
  // conf
L
Liu Jicong 已提交
75
  char           groupId[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
76
  char           clientId[256];
L
Liu Jicong 已提交
77
  int8_t         autoCommit;
L
Liu Jicong 已提交
78
  int8_t         inWaiting;
L
Liu Jicong 已提交
79
  int64_t        consumerId;
L
Liu Jicong 已提交
80
  int32_t        epoch;
L
Liu Jicong 已提交
81
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
82 83 84 85
  int64_t        status;
  STscObj*       pTscObj;
  tmq_commit_cb* commit_cb;
  int32_t        nextTopicIdx;
L
fix txn  
Liu Jicong 已提交
86
  int8_t         epStatus;
L
temp  
Liu Jicong 已提交
87
  int32_t        epSkipCnt;
L
fix  
Liu Jicong 已提交
88
  int32_t        waitingRequest;
L
Liu Jicong 已提交
89
  int32_t        readyRequest;
L
Liu Jicong 已提交
90
  SArray*        clientTopics;  // SArray<SMqClientTopic>
X
Xiaoyu Wang 已提交
91 92
  STaosQueue*    mqueue;        // queue of tmq_message_t
  STaosQall*     qall;
L
Liu Jicong 已提交
93
  tsem_t         rspSem;
L
Liu Jicong 已提交
94 95
  // stat
  int64_t pollCnt;
L
Liu Jicong 已提交
96 97
};

X
Xiaoyu Wang 已提交
98 99 100 101 102 103 104 105
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
L
Liu Jicong 已提交
106 107
};

L
Liu Jicong 已提交
108
typedef struct {
109 110 111 112
  // statistics
  int64_t pollCnt;
  // offset
  int64_t currentOffset;
L
Liu Jicong 已提交
113
  // connection info
114
  int32_t vgId;
X
Xiaoyu Wang 已提交
115
  int32_t vgStatus;
L
Liu Jicong 已提交
116
  int32_t vgSkipCnt;
117 118 119
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
120
typedef struct {
121
  // subscribe info
L
Liu Jicong 已提交
122 123 124 125 126 127 128 129
  int32_t        sqlLen;
  char*          sql;
  char*          topicName;
  int64_t        topicId;
  SArray*        vgs;  // SArray<SMqClientVg>
  int8_t         isSchemaAdaptive;
  int32_t        numOfFields;
  SSchemaWrapper schema;
130 131
} SMqClientTopic;

L
Liu Jicong 已提交
132 133 134 135 136 137 138 139
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
  SMqPollRspV2    msg;
} SMqPollRspWrapper;

L
Liu Jicong 已提交
140
typedef struct {
L
Liu Jicong 已提交
141 142 143 144
  tmq_t*         tmq;
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
145

L
Liu Jicong 已提交
146
typedef struct {
147
  tmq_t*  tmq;
X
Xiaoyu Wang 已提交
148 149
  int32_t sync;
  tsem_t  rspSem;
150 151
} SMqAskEpCbParam;

L
Liu Jicong 已提交
152
typedef struct {
L
Liu Jicong 已提交
153 154
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
155
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
156
  int32_t         epoch;
L
Liu Jicong 已提交
157
  int32_t         vgId;
L
Liu Jicong 已提交
158 159
  tsem_t          rspSem;
  int32_t         sync;
X
Xiaoyu Wang 已提交
160
} SMqPollCbParam;
161

L
Liu Jicong 已提交
162
typedef struct {
L
Liu Jicong 已提交
163
  tmq_t*         tmq;
L
Liu Jicong 已提交
164
  int32_t        async;
L
Liu Jicong 已提交
165 166
  tsem_t         rspSem;
  tmq_resp_err_t rspErr;
L
Liu Jicong 已提交
167
  /*SMqClientVg* pVg;*/
L
Liu Jicong 已提交
168
} SMqCommitCbParam;
L
Liu Jicong 已提交
169

170
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
171
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
L
Liu Jicong 已提交
172
  conf->autoCommit = false;
X
Xiaoyu Wang 已提交
173
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
174 175 176
  return conf;
}

L
Liu Jicong 已提交
177
void tmq_conf_destroy(tmq_conf_t* conf) {
wafwerar's avatar
wafwerar 已提交
178
  if (conf) taosMemoryFree(conf);
L
Liu Jicong 已提交
179 180 181
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
182 183
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
184
    return TMQ_CONF_OK;
185
  }
L
Liu Jicong 已提交
186

187 188
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
189 190
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
191

L
Liu Jicong 已提交
192 193
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
194
      conf->autoCommit = true;
L
Liu Jicong 已提交
195 196
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
197
      conf->autoCommit = false;
L
Liu Jicong 已提交
198 199 200 201
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
202
  }
L
Liu Jicong 已提交
203

L
Liu Jicong 已提交
204 205 206 207 208 209 210 211 212 213 214 215 216 217
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
218

L
Liu Jicong 已提交
219
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
220 221 222
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
223
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
224 225 226
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
227
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
228 229 230
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
231
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
232 233 234
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
235
  if (strcmp(key, "td.connect.db") == 0) {
L
Liu Jicong 已提交
236 237 238 239
    conf->db = strdup(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
240
  return TMQ_CONF_UNKNOWN;
241 242 243
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
244 245
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
246 247
}

L
Liu Jicong 已提交
248 249 250
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
251
  if (taosArrayPush(container, &topic) == NULL) return -1;
252 253 254
  return 0;
}

L
Liu Jicong 已提交
255
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
256
  SArray* container = &list->container;
L
fix  
Liu Jicong 已提交
257 258
  /*taosArrayDestroy(container);*/
  taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
L
Liu Jicong 已提交
259 260
}

L
Liu Jicong 已提交
261 262 263 264
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
265
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
266
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
267 268 269 270 271 272 273 274
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
275
  msg = NULL;
L
Liu Jicong 已提交
276 277 278 279 280 281 282 283 284 285
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
286 287 288 289 290 291
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
  tsem_post(&pParam->rspSem);
  return 0;
}
292

L
Liu Jicong 已提交
293
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
L
Liu Jicong 已提交
294
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
295
  pParam->rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
L
Liu Jicong 已提交
296
  if (pParam->tmq->commit_cb) {
L
Liu Jicong 已提交
297
    pParam->tmq->commit_cb(pParam->tmq, pParam->rspErr, NULL, NULL);
L
Liu Jicong 已提交
298
  }
L
fix  
Liu Jicong 已提交
299
  if (!pParam->async) tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
300 301 302
  return 0;
}

X
Xiaoyu Wang 已提交
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318
tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* topic = taosArrayGetP(tmq->clientTopics, i);
    tmq_list_append(*topics, strdup(topic->topicName));
  }
  return TMQ_RESP_ERR__SUCCESS;
}

tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  return tmq_subscribe(tmq, lst);
}

319
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
320
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
321 322 323 324
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
L
Liu Jicong 已提交
325
  pTmq->inWaiting = 0;
326 327
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
328
  pTmq->epoch = 0;
L
fix  
Liu Jicong 已提交
329
  pTmq->waitingRequest = 0;
L
Liu Jicong 已提交
330
  pTmq->readyRequest = 0;
L
fix txn  
Liu Jicong 已提交
331
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
332
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
333
  // set conf
334 335
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
336
  pTmq->autoCommit = conf->autoCommit;
337
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
338
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
339

L
Liu Jicong 已提交
340 341 342 343 344 345 346 347 348 349
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
350 351
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
352 353 354 355 356 357 358 359
  return pTmq;
}

tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
360 361 362 363 364 365 366 367 368
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
  ASSERT(conf->db);

  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) return NULL;
L
Liu Jicong 已提交
369 370 371 372 373 374 375

  pTmq->inWaiting = 0;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
  pTmq->waitingRequest = 0;
  pTmq->readyRequest = 0;
L
temp  
Liu Jicong 已提交
376 377
  pTmq->epStatus = 0;
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
378 379 380 381 382 383 384
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
  pTmq->autoCommit = conf->autoCommit;
  pTmq->commit_cb = conf->commit_cb;
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
385
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
386
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
L
Liu Jicong 已提交
387 388 389 390
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }
X
Xiaoyu Wang 已提交
391 392 393

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
L
Liu Jicong 已提交
394 395 396

  tsem_init(&pTmq->rspSem, 0, 0);

397 398 399
  return pTmq;
}

L
Liu Jicong 已提交
400 401 402 403
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
  // TODO: add read write lock
  SRequestObj*   pRequest = NULL;
  tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS;
L
Liu Jicong 已提交
404 405
  // build msg
  // send to mnode
L
Liu Jicong 已提交
406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
  SMqCMCommitOffsetReq req;
  SArray*              pArray = NULL;

  if (offsets == NULL) {
    pArray = taosArrayInit(0, sizeof(SMqOffset));
    for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        strcpy(offset.topicName, pTopic->topicName);
        strcpy(offset.cgroup, tmq->groupId);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pArray, &offset);
      }
    }
    req.num = pArray->size;
    req.offsets = pArray->pData;
  } else {
    req.num = offsets->cnt;
    req.offsets = (SMqOffset*)offsets->elems;
  }
L
Liu Jicong 已提交
429 430 431 432

  SCoder encoder;

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
L
Liu Jicong 已提交
433
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
434
  int32_t tlen = encoder.pos;
wafwerar's avatar
wafwerar 已提交
435
  void*   buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
436 437 438 439 440 441 442
  if (buf == NULL) {
    tCoderClear(&encoder);
    return -1;
  }
  tCoderClear(&encoder);

  tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
L
Liu Jicong 已提交
443
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
444 445
  tCoderClear(&encoder);

L
Liu Jicong 已提交
446
  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET);
L
Liu Jicong 已提交
447 448 449 450
  if (pRequest == NULL) {
    tscError("failed to malloc request");
  }

wafwerar's avatar
wafwerar 已提交
451
  SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
452 453 454 455 456
  if (pParam == NULL) {
    return -1;
  }
  pParam->tmq = tmq;
  tsem_init(&pParam->rspSem, 0, 0);
L
fix  
Liu Jicong 已提交
457
  pParam->async = async;
L
Liu Jicong 已提交
458

X
Xiaoyu Wang 已提交
459 460 461 462 463
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
L
Liu Jicong 已提交
464 465

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
466 467
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
L
Liu Jicong 已提交
468 469 470 471 472
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
473 474 475 476
  if (!async) {
    tsem_wait(&pParam->rspSem);
    resp = pParam->rspErr;
  }
L
Liu Jicong 已提交
477

L
fix  
Liu Jicong 已提交
478
  tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
479
  taosMemoryFree(pParam);
L
fix  
Liu Jicong 已提交
480

L
Liu Jicong 已提交
481 482 483 484 485
  if (pArray) {
    taosArrayDestroy(pArray);
  }

  return resp;
L
Liu Jicong 已提交
486 487
}

L
Liu Jicong 已提交
488
tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
L
Liu Jicong 已提交
489
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
490 491
  SArray*      container = &topic_list->container;
  int32_t      sz = taosArrayGetSize(container);
L
Liu Jicong 已提交
492
  // destroy ex
493 494 495 496 497 498 499 500 501 502
  taosArrayDestroy(tmq->clientTopics);
  tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));

  SCMSubscribeReq req;
  req.topicNum = sz;
  req.consumerId = tmq->consumerId;
  req.consumerGroup = strdup(tmq->groupId);
  req.topicNames = taosArrayInit(sz, sizeof(void*));

  for (int i = 0; i < sz; i++) {
L
Liu Jicong 已提交
503 504
    /*char* topicName = topic_list->elems[i];*/
    char* topicName = taosArrayGetP(container, i);
505 506 507

    SName name = {0};
    char* dbName = getDbOfConnection(tmq->pTscObj);
L
Liu Jicong 已提交
508 509 510
    if (dbName == NULL) {
      return TMQ_RESP_ERR__FAIL;
    }
L
Liu Jicong 已提交
511
    tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
512 513
    tNameFromString(&name, topicName, T_NAME_TABLE);

wafwerar's avatar
wafwerar 已提交
514
    char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
515
    if (topicFname == NULL) {
L
Liu Jicong 已提交
516
      goto _return;
517 518 519 520
    }
    tNameExtractFullName(&name, topicFname);
    tscDebug("subscribe topic: %s", topicFname);
    SMqClientTopic topic = {
L
Liu Jicong 已提交
521 522 523 524 525 526
        .sql = NULL,
        .sqlLen = 0,
        .topicId = 0,
        .topicName = topicFname,
        .vgs = NULL,
    };
527
    topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
L
Liu Jicong 已提交
528
    taosArrayPush(tmq->clientTopics, &topic);
529
    taosArrayPush(req.topicNames, &topicFname);
wafwerar's avatar
wafwerar 已提交
530
    taosMemoryFree(dbName);
531 532
  }

L
Liu Jicong 已提交
533
  int   tlen = tSerializeSCMSubscribeReq(NULL, &req);
wafwerar's avatar
wafwerar 已提交
534
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
535
  if (buf == NULL) {
536 537 538 539 540 541 542 543 544
    goto _return;
  }

  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
  if (pRequest == NULL) {
L
Liu Jicong 已提交
545
    tscError("failed to malloc request");
546 547
  }

X
Xiaoyu Wang 已提交
548 549 550 551
  SMqSubscribeCbParam param = {
      .rspErr = TMQ_RESP_ERR__SUCCESS,
      .tmq = tmq,
  };
L
Liu Jicong 已提交
552 553
  tsem_init(&param.rspSem, 0, 0);

X
Xiaoyu Wang 已提交
554 555 556 557 558
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
559 560

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
561 562
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
563 564 565 566 567
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
568 569
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
570 571 572

_return:
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
573
  /*destroySendMsgInfo(sendInfo);*/
574 575
  /*}*/

L
Liu Jicong 已提交
576
  return param.rspErr;
577 578
}

L
Liu Jicong 已提交
579
void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; }
580

L
Liu Jicong 已提交
581 582 583 584 585 586 587 588 589 590 591 592 593 594 595
TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQuery*      pQueryNode = NULL;
  char*        astStr = NULL;
  int32_t      sqlLen;

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || streamName == NULL || sql == NULL) {
    tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }
  sqlLen = strlen(sql);

L
Liu Jicong 已提交
596 597
  if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) {
    tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1);
L
Liu Jicong 已提交
598 599 600 601 602 603 604 605 606 607 608 609
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

  tscDebug("start to create stream: %s", streamName);

H
Haojun Liao 已提交
610
  int32_t code = 0;
L
Liu Jicong 已提交
611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode), _return);

  // todo check for invalid sql statement and return with error code

  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);

  /*printf("%s\n", pStr);*/

  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, streamName);

  SCMCreateStreamReq req = {
      .igExists = 1,
      .ast = astStr,
      .sql = (char*)sql,
  };
  tNameExtractFullName(&name, req.name);
L
Liu Jicong 已提交
630
  strcpy(req.outputSTbName, tbName);
L
Liu Jicong 已提交
631 632

  int   tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
633
  void* buf = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656
  if (buf == NULL) {
    goto _return;
  }

  tSerializeSCMCreateStreamReq(buf, tlen, &req);
  /*printf("formatted: %s\n", dagStr);*/

  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
  pRequest->type = TDMT_MND_CREATE_STREAM;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);

_return:
wafwerar's avatar
wafwerar 已提交
657
  taosMemoryFreeClear(astStr);
L
Liu Jicong 已提交
658 659 660 661 662 663 664 665 666 667 668 669
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
  /*destroySendMsgInfo(sendInfo);*/
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}

L
Liu Jicong 已提交
670
#if 0
L
Liu Jicong 已提交
671 672 673
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
  STscObj*     pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
674
  SQuery*      pQueryNode = NULL;
L
Liu Jicong 已提交
675
  char*        astStr = NULL;
676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695

  terrno = TSDB_CODE_SUCCESS;
  if (taos == NULL || topicName == NULL || sql == NULL) {
    tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) {
    tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1);
    terrno = TSDB_CODE_TSC_INVALID_INPUT;
    goto _return;
  }

  if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    goto _return;
  }

L
Liu Jicong 已提交
696
  tscDebug("start to create topic: %s", topicName);
697

X
Xiaoyu Wang 已提交
698
  int32_t code = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
699 700
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return);
701 702 703

  // todo check for invalid sql statement and return with error code

L
Liu Jicong 已提交
704
  CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return);
705

L
Liu Jicong 已提交
706
  /*printf("%s\n", pStr);*/
707

L
Liu Jicong 已提交
708
  SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T};
X
Xiaoyu Wang 已提交
709 710
  strcpy(name.dbname, pRequest->pDb);
  strcpy(name.tname, topicName);
711

L
Liu Jicong 已提交
712
  SCMCreateTopicReq req = {
L
Liu Jicong 已提交
713
      .igExists = 1,
L
Liu Jicong 已提交
714
      .ast = astStr,
L
Liu Jicong 已提交
715
      .sql = (char*)sql,
716
  };
L
Liu Jicong 已提交
717
  tNameExtractFullName(&name, req.name);
718

L
Liu Jicong 已提交
719
  int   tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req);
wafwerar's avatar
wafwerar 已提交
720
  void* buf = taosMemoryMalloc(tlen);
721 722 723 724
  if (buf == NULL) {
    goto _return;
  }

L
Liu Jicong 已提交
725
  tSerializeSCMCreateTopicReq(buf, tlen, &req);
726 727
  /*printf("formatted: %s\n", dagStr);*/

L
Liu Jicong 已提交
728 729 730 731 732
  pRequest->body.requestMsg = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
733 734 735
  pRequest->type = TDMT_MND_CREATE_TOPIC;

  SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
736
  SEpSet        epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
737 738 739 740 741

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
742

743
_return:
wafwerar's avatar
wafwerar 已提交
744
  taosMemoryFreeClear(astStr);
745 746
  qDestroyQuery(pQueryNode);
  /*if (sendInfo != NULL) {*/
L
Liu Jicong 已提交
747
  /*destroySendMsgInfo(sendInfo);*/
748 749 750 751 752 753 754 755
  /*}*/

  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }

  return pRequest;
}
L
Liu Jicong 已提交
756
#endif
757

L
Liu Jicong 已提交
758
static char* formatTimestamp(char* buf, int64_t val, int precision) {
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793
  time_t  tt;
  int32_t ms = 0;
  if (precision == TSDB_TIME_PRECISION_NANO) {
    tt = (time_t)(val / 1000000000);
    ms = val % 1000000000;
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    tt = (time_t)(val / 1000000);
    ms = val % 1000000;
  } else {
    tt = (time_t)(val / 1000);
    ms = val % 1000;
  }

  /* comment out as it make testcases like select_with_tags.sim fail.
    but in windows, this may cause the call to localtime crash if tt < 0,
    need to find a better solution.
    if (tt < 0) {
      tt = 0;
    }
    */

#ifdef WINDOWS
  if (tt < 0) tt = 0;
#endif
  if (tt <= 0 && ms < 0) {
    tt--;
    if (precision == TSDB_TIME_PRECISION_NANO) {
      ms += 1000000000;
    } else if (precision == TSDB_TIME_PRECISION_MICRO) {
      ms += 1000000;
    } else {
      ms += 1000;
    }
  }

794
  struct tm* ptm = taosLocalTime(&tt, NULL);
795 796 797 798 799 800 801 802 803 804 805 806
  size_t     pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);

  if (precision == TSDB_TIME_PRECISION_NANO) {
    sprintf(buf + pos, ".%09d", ms);
  } else if (precision == TSDB_TIME_PRECISION_MICRO) {
    sprintf(buf + pos, ".%06d", ms);
  } else {
    sprintf(buf + pos, ".%03d", ms);
  }

  return buf;
}
L
Liu Jicong 已提交
807
#if 0
L
Liu Jicong 已提交
808 809
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
810
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
811 812 813
  return pRsp->skipLogNum;
}

L
Liu Jicong 已提交
814 815 816
void tmqShowMsg(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;

L
Liu Jicong 已提交
817 818
  static bool noPrintSchema;
  char        pBuf[128];
L
Liu Jicong 已提交
819
  SMqPollRsp* pRsp = &tmq_message->msg;
L
fix  
Liu Jicong 已提交
820
  int32_t     colNum = 2;
L
Liu Jicong 已提交
821 822 823 824
  if (!noPrintSchema) {
    printf("|");
    for (int32_t i = 0; i < colNum; i++) {
      if (i == 0)
L
Liu Jicong 已提交
825
        printf(" %25s |", pRsp->schema->pSchema[i].name);
L
Liu Jicong 已提交
826
      else
L
Liu Jicong 已提交
827
        printf(" %15s |", pRsp->schema->pSchema[i].name);
L
Liu Jicong 已提交
828 829 830 831
    }
    printf("\n");
    printf("===============================================\n");
    noPrintSchema = true;
832
  }
L
Liu Jicong 已提交
833
  int32_t sz = taosArrayGetSize(pRsp->pBlockData);
834
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
835 836
    SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i);
    int32_t      rows = pDataBlock->info.rows;
837 838 839 840
    for (int32_t j = 0; j < rows; j++) {
      printf("|");
      for (int32_t k = 0; k < colNum; k++) {
        SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
L
Liu Jicong 已提交
841 842
        void*            var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
        switch (pColInfoData->info.type) {
843 844 845 846 847 848 849 850 851 852 853 854 855
          case TSDB_DATA_TYPE_TIMESTAMP:
            formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
            printf(" %25s |", pBuf);
            break;
          case TSDB_DATA_TYPE_INT:
          case TSDB_DATA_TYPE_UINT:
            printf(" %15u |", *(uint32_t*)var);
            break;
        }
      }
      printf("\n");
    }
  }
L
Liu Jicong 已提交
856
}
L
Liu Jicong 已提交
857
#endif
L
Liu Jicong 已提交
858 859

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
L
fix  
Liu Jicong 已提交
860
  /*printf("recv poll\n");*/
X
Xiaoyu Wang 已提交
861 862
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
863
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
864
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
865
  if (code != 0) {
L
Liu Jicong 已提交
866
    tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code);
L
fix txn  
Liu Jicong 已提交
867
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
868 869
  }

X
Xiaoyu Wang 已提交
870 871 872
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
fix  
Liu Jicong 已提交
873
    /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
L
temp  
Liu Jicong 已提交
874
    /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
875 876
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
            tmqEpoch);
X
Xiaoyu Wang 已提交
877 878 879 880
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
881
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
L
fix  
Liu Jicong 已提交
882 883
  } else {
    atomic_sub_fetch_32(&tmq->waitingRequest, 1);
X
Xiaoyu Wang 已提交
884 885
  }

L
Liu Jicong 已提交
886
#if 0
L
Liu Jicong 已提交
887
  if (pParam->sync == 1) {
wafwerar's avatar
wafwerar 已提交
888
    /**pParam->msg = taosMemoryMalloc(sizeof(tmq_message_t));*/
L
Liu Jicong 已提交
889 890 891 892 893 894 895 896 897 898 899 900 901 902
    *pParam->msg = taosAllocateQitem(sizeof(tmq_message_t));
    if (*pParam->msg) {
      memcpy(*pParam->msg, pMsg->pData, sizeof(SMqRspHead));
      tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &((*pParam->msg)->consumeRsp));
      if ((*pParam->msg)->consumeRsp.numOfTopics != 0) {
        pVg->currentOffset = (*pParam->msg)->consumeRsp.rspOffset;
      }
      taosWriteQitem(tmq->mqueue, *pParam->msg);
      tsem_post(&pParam->rspSem);
      return 0;
    }
    tsem_post(&pParam->rspSem);
    return -1;
  }
L
Liu Jicong 已提交
903
#endif
L
Liu Jicong 已提交
904

wafwerar's avatar
wafwerar 已提交
905
  /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
L
Liu Jicong 已提交
906 907 908
  /*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper));
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
909
    tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch);
L
fix txn  
Liu Jicong 已提交
910
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
911
  }
L
Liu Jicong 已提交
912 913 914 915 916 917
  pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP;
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
  /*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
  memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
  tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg);
L
Liu Jicong 已提交
918 919
  // TODO: alloc mem
  /*pRsp->*/
920
  /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
L
Liu Jicong 已提交
921

L
temp  
Liu Jicong 已提交
922
#if 0
L
Liu Jicong 已提交
923
  if (pRsp->msg.numOfTopics == 0) {
L
fix  
Liu Jicong 已提交
924
    /*printf("no data\n");*/
X
Xiaoyu Wang 已提交
925
    taosFreeQitem(pRsp);
L
fix txn  
Liu Jicong 已提交
926
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
927
  }
L
temp  
Liu Jicong 已提交
928
#endif
L
Liu Jicong 已提交
929

L
Liu Jicong 已提交
930 931
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
           pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset);
L
fix  
Liu Jicong 已提交
932

L
Liu Jicong 已提交
933
  taosWriteQitem(tmq->mqueue, pRspWrapper);
L
Liu Jicong 已提交
934
  atomic_add_fetch_32(&tmq->readyRequest, 1);
L
temp  
Liu Jicong 已提交
935
  /*tsem_post(&tmq->rspSem);*/
936
  return 0;
L
Liu Jicong 已提交
937

L
fix txn  
Liu Jicong 已提交
938
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
939 940 941
  if (pParam->epoch == tmq->epoch) {
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
L
temp  
Liu Jicong 已提交
942
  /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
943
  return code;
944 945
}

X
Xiaoyu Wang 已提交
946
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
L
Liu Jicong 已提交
947
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
948
  bool    set = false;
L
Liu Jicong 已提交
949 950
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
951 952
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
953 954 955 956 957 958 959 960 961 962 963 964
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
965 966
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
967
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
968
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
969
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
970

L
Liu Jicong 已提交
971
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
972 973 974 975 976 977
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
978
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
979 980 981 982
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
983
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
984 985 986 987 988 989 990 991 992
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
993
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
994 995 996
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
L
Liu Jicong 已提交
997
      tscDebug("consumer %ld epoch %d vg %d offset og to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
998 999
      if (pOffset != NULL) {
        offset = *pOffset;
L
Liu Jicong 已提交
1000
        tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
L
Liu Jicong 已提交
1001
      }
L
Liu Jicong 已提交
1002
      tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1003 1004
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1005
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1006 1007 1008
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1009
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1010 1011 1012 1013
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1014
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1015
  }
L
Liu Jicong 已提交
1016
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1017
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1018
  tmq->clientTopics = newTopics;
X
Xiaoyu Wang 已提交
1019 1020 1021 1022
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1023
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1024
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1025
  tmq_t*           tmq = pParam->tmq;
1026
  if (code != 0) {
L
temp  
Liu Jicong 已提交
1027
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
L
Liu Jicong 已提交
1028
    goto END;
1029
  }
L
Liu Jicong 已提交
1030

L
Liu Jicong 已提交
1031
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1032
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1033
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1034 1035
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1036
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1037 1038
  if (head->epoch <= epoch) {
    goto END;
1039
  }
L
Liu Jicong 已提交
1040

X
Xiaoyu Wang 已提交
1041 1042 1043 1044 1045
  if (pParam->sync) {
    SMqCMGetSubEpRsp rsp;
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1046
    if (tmqUpdateEp(tmq, head->epoch, &rsp)) {
X
Xiaoyu Wang 已提交
1047
      atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1048
    }
X
Xiaoyu Wang 已提交
1049 1050
    tDeleteSMqCMGetSubEpRsp(&rsp);
  } else {
L
Liu Jicong 已提交
1051 1052 1053
    /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1054
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1055 1056
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1057
    }
L
Liu Jicong 已提交
1058 1059 1060 1061
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
    tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1062

L
Liu Jicong 已提交
1063
    taosWriteQitem(tmq->mqueue, pWrapper);
L
temp  
Liu Jicong 已提交
1064
    /*tsem_post(&tmq->rspSem);*/
1065
  }
L
Liu Jicong 已提交
1066 1067

END:
L
fix txn  
Liu Jicong 已提交
1068
  atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1069 1070 1071 1072
  if (pParam->sync) {
    tsem_post(&pParam->rspSem);
  }
  return code;
1073 1074
}

X
Xiaoyu Wang 已提交
1075
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
L
fix txn  
Liu Jicong 已提交
1076 1077
  int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1078
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1079
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1080
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1081
  }
L
temp  
Liu Jicong 已提交
1082
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1083
  int32_t           tlen = sizeof(SMqCMGetSubEpReq);
wafwerar's avatar
wafwerar 已提交
1084
  SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
L
Liu Jicong 已提交
1085
  if (req == NULL) {
L
Liu Jicong 已提交
1086
    tscError("failed to malloc get subscribe ep buf");
L
add log  
Liu Jicong 已提交
1087
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1088
    return -1;
L
Liu Jicong 已提交
1089
  }
L
Liu Jicong 已提交
1090 1091 1092
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1093

wafwerar's avatar
wafwerar 已提交
1094
  SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1095 1096
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1097
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1098
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1099
    return -1;
L
Liu Jicong 已提交
1100 1101
  }
  pParam->tmq = tmq;
X
Xiaoyu Wang 已提交
1102 1103
  pParam->sync = sync;
  tsem_init(&pParam->rspSem, 0, 0);
1104

wafwerar's avatar
wafwerar 已提交
1105
  SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1106 1107
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1108 1109
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
add log  
Liu Jicong 已提交
1110
    atomic_store_8(&tmq->epStatus, 0);
L
Liu Jicong 已提交
1111 1112 1113 1114 1115 1116 1117 1118 1119 1120
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1121 1122 1123
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1124
  sendInfo->msgType = TDMT_MND_GET_SUB_EP;
1125

L
Liu Jicong 已提交
1126 1127
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1128 1129
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1130 1131
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1132

X
Xiaoyu Wang 已提交
1133
  if (sync) tsem_wait(&pParam->rspSem);
L
Liu Jicong 已提交
1134
  return 0;
1135 1136
}

L
Liu Jicong 已提交
1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150
tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1151
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1152 1153 1154 1155 1156 1157 1158 1159
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}

L
Liu Jicong 已提交
1160
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
    if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {
      tscError("unable to poll since no committed offset but reset offset is set to none");
      return NULL;
    }
    reqOffset = tmq->resetOffsetCfg;
  }

wafwerar's avatar
wafwerar 已提交
1172
  SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
1173 1174 1175
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1176

L
Liu Jicong 已提交
1177
  strcpy(pReq->topic, pTopic->topicName);
1178 1179
  strcpy(pReq->cgroup, tmq->groupId);

L
fix  
Liu Jicong 已提交
1180
  pReq->blockingTime = blockingTime;
L
Liu Jicong 已提交
1181
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1182
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1183
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1184
  pReq->reqId = generateRequestId();
1185 1186

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1187
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1188 1189 1190
  return pReq;
}

L
Liu Jicong 已提交
1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
  pRspObj->topic = strdup(pWrapper->topicHandle->topicName);
  pRspObj->resIter = -1;
  pRspObj->vgId = pWrapper->vgHandle->vgId;
  SMqPollRspV2* pRsp = &pWrapper->msg;
  int32_t       blockNum = taosArrayGetSize(pRsp->blockPos);
  pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo));
  for (int32_t i = 0; i < blockNum; i++) {
    int32_t            pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i);
    SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos);
L
Liu Jicong 已提交
1203 1204 1205 1206
    SReqResultInfo     resInfo = {0};
    resInfo.totalRows = 0;
    resInfo.precision = TSDB_TIME_PRECISION_MILLI;
    setResSchemaInfo(&resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
L
Liu Jicong 已提交
1207 1208 1209 1210 1211 1212
    setQueryResultFromRsp(&resInfo, pRetrieve, true);
    taosArrayPush(pRspObj->res, &resInfo);
  }
  return pRspObj;
}

L
Liu Jicong 已提交
1213
#if 0
L
Liu Jicong 已提交
1214 1215 1216 1217 1218 1219 1220 1221 1222 1223
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
  tmq_message_t* msg = NULL;
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
      /*continue;*/
      /*}*/
L
Liu Jicong 已提交
1224
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
L
Liu Jicong 已提交
1225 1226 1227 1228 1229
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // TODO: out of mem
        return NULL;
      }
X
Xiaoyu Wang 已提交
1230

wafwerar's avatar
wafwerar 已提交
1231
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1232
      if (pParam == NULL) {
L
Liu Jicong 已提交
1233 1234 1235 1236
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // TODO: out of mem
        return NULL;
      }
L
Liu Jicong 已提交
1237 1238 1239 1240 1241 1242 1243
      pParam->tmq = tmq;
      pParam->pVg = pVg;
      pParam->epoch = tmq->epoch;
      pParam->sync = 1;
      pParam->msg = &msg;
      tsem_init(&pParam->rspSem, 0, 0);

wafwerar's avatar
wafwerar 已提交
1244
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1245 1246 1247 1248 1249
      if (sendInfo == NULL) {
        return NULL;
      }

      sendInfo->msgInfo = (SDataBuf){
L
Liu Jicong 已提交
1250
          .pData = pReq,
L
Liu Jicong 已提交
1251
          .len = sizeof(SMqPollReq),
L
Liu Jicong 已提交
1252 1253
          .handle = NULL,
      };
L
Liu Jicong 已提交
1254
      sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1255
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1256
      sendInfo->param = pParam;
L
Liu Jicong 已提交
1257
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1258
      sendInfo->msgType = TDMT_VND_CONSUME;
L
Liu Jicong 已提交
1259 1260 1261 1262 1263 1264 1265 1266

      int64_t transporterId = 0;
      /*printf("send poll\n");*/
      atomic_add_fetch_32(&tmq->waitingRequest, 1);
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;

L
Liu Jicong 已提交
1267
      tsem_wait(&pParam->rspSem);
L
Liu Jicong 已提交
1268 1269 1270 1271
      tmq_message_t* nmsg = NULL;
      while (1) {
        taosReadQitem(tmq->mqueue, (void**)&nmsg);
        if (nmsg == NULL) continue;
L
Liu Jicong 已提交
1272 1273 1274
        while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
          taosReadQitem(tmq->mqueue, (void**)&nmsg);
        }
L
Liu Jicong 已提交
1275 1276 1277
        return nmsg;
      }
    }
X
Xiaoyu Wang 已提交
1278
  }
L
Liu Jicong 已提交
1279
  return NULL;
X
Xiaoyu Wang 已提交
1280
}
L
Liu Jicong 已提交
1281
#endif
X
Xiaoyu Wang 已提交
1282 1283

int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
L
fix  
Liu Jicong 已提交
1284
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1285 1286 1287 1288 1289 1290
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1291
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1292
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1293
        continue;
L
Liu Jicong 已提交
1294
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1295 1296 1297 1298 1299 1300 1301
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1302
      }
L
Liu Jicong 已提交
1303
      atomic_store_32(&pVg->vgSkipCnt, 0);
L
Liu Jicong 已提交
1304
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
X
Xiaoyu Wang 已提交
1305 1306
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1307
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1308 1309
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1310
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1311
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1312
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1313
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1314
        /*tsem_post(&tmq->rspSem);*/
X
Xiaoyu Wang 已提交
1315 1316
        return -1;
      }
L
Liu Jicong 已提交
1317 1318
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1319
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1320
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1321 1322 1323
      pParam->epoch = tmq->epoch;
      pParam->sync = 0;

wafwerar's avatar
wafwerar 已提交
1324
      SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1325
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1326 1327
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1328
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
temp  
Liu Jicong 已提交
1329
        /*tsem_post(&tmq->rspSem);*/
L
Liu Jicong 已提交
1330 1331 1332 1333
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1334
          .pData = pReq,
L
Liu Jicong 已提交
1335
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1336 1337
          .handle = NULL,
      };
L
Liu Jicong 已提交
1338
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1339
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1340
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1341
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1342
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1343 1344

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1345 1346
      /*printf("send poll\n");*/
      atomic_add_fetch_32(&tmq->waitingRequest, 1);
L
Liu Jicong 已提交
1347 1348
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1349
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1350 1351 1352 1353 1354 1355 1356 1357
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1358 1359
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1360
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1361 1362 1363 1364
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
      SMqCMGetSubEpRsp*   rspMsg = &pEpRspWrapper->msg;
      tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1365
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1366 1367 1368 1369 1370 1371 1372 1373 1374 1375
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

L
Liu Jicong 已提交
1376
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1377
  while (1) {
L
Liu Jicong 已提交
1378 1379 1380
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1381
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1382 1383
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1384 1385
    }

L
Liu Jicong 已提交
1386 1387
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1388
      atomic_sub_fetch_32(&tmq->readyRequest, 1);
L
fix  
Liu Jicong 已提交
1389
      /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
L
Liu Jicong 已提交
1390
      if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
L
fix  
Liu Jicong 已提交
1391
        /*printf("epoch match\n");*/
L
Liu Jicong 已提交
1392
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1393
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1394
        pVg->currentOffset = pollRspWrapper->msg.rspOffset;
X
Xiaoyu Wang 已提交
1395
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1396 1397 1398
        if (pollRspWrapper->msg.dataLen == 0) {
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1399 1400
          continue;
        }
L
Liu Jicong 已提交
1401
        // build rsp
L
Liu Jicong 已提交
1402 1403
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        return pRsp;
X
Xiaoyu Wang 已提交
1404
      } else {
L
Liu Jicong 已提交
1405
        /*printf("epoch mismatch\n");*/
L
Liu Jicong 已提交
1406
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1407 1408
      }
    } else {
L
fix  
Liu Jicong 已提交
1409
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1410
      bool reset = false;
L
Liu Jicong 已提交
1411 1412
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1413
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1414
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
X
Xiaoyu Wang 已提交
1415 1416 1417 1418 1419 1420
        tmqPollImpl(tmq, blockingTime);
      }
    }
  }
}

L
Liu Jicong 已提交
1421
#if 0
L
Liu Jicong 已提交
1422
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
X
Xiaoyu Wang 已提交
1423 1424
  tmq_message_t* rspMsg = NULL;
  int64_t        startTime = taosGetTimestampMs();
1425 1426

  int64_t status = atomic_load_64(&tmq->status);
X
Xiaoyu Wang 已提交
1427 1428
  tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);

L
Liu Jicong 已提交
1429 1430 1431 1432 1433 1434 1435 1436 1437 1438 1439 1440 1441
  while (1) {
    rspMsg = tmqSyncPollImpl(tmq, blocking_time);
    if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
      return rspMsg;
    }

    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
        return NULL;
      }
    } else
      return NULL;
X
Xiaoyu Wang 已提交
1442
  }
L
Liu Jicong 已提交
1443
}
L
Liu Jicong 已提交
1444
#endif
X
Xiaoyu Wang 已提交
1445

L
Liu Jicong 已提交
1446 1447 1448
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1449

L
Liu Jicong 已提交
1450
  // TODO: put into another thread or delayed queue
1451
  int64_t status = atomic_load_64(&tmq->status);
L
Liu Jicong 已提交
1452 1453
  tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);

L
Liu Jicong 已提交
1454 1455 1456
  rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1457
  }
X
Xiaoyu Wang 已提交
1458 1459 1460

  while (1) {
    /*printf("cycle\n");*/
L
Liu Jicong 已提交
1461
    tmqAskEp(tmq, false);
L
Liu Jicong 已提交
1462
    tmqPollImpl(tmq, blocking_time);
L
Liu Jicong 已提交
1463

L
temp  
Liu Jicong 已提交
1464
    /*tsem_wait(&tmq->rspSem);*/
L
Liu Jicong 已提交
1465

L
Liu Jicong 已提交
1466 1467 1468
    rspObj = tmqHandleAllRsp(tmq, blocking_time, false);
    if (rspObj) {
      return (TAOS_RES*)rspObj;
X
Xiaoyu Wang 已提交
1469 1470 1471 1472
    }
    if (blocking_time != 0) {
      int64_t endTime = taosGetTimestampMs();
      if (endTime - startTime > blocking_time) {
L
Liu Jicong 已提交
1473
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1474 1475 1476 1477 1478 1479 1480
        return NULL;
      }
    }
  }
}

#if 0
1481

L
Liu Jicong 已提交
1482
  if (blocking_time <= 0) blocking_time = 1;
L
Liu Jicong 已提交
1483 1484
  if (blocking_time > 1000) blocking_time = 1000;
  /*blocking_time = 1;*/
1485 1486 1487

  if (taosArrayGetSize(tmq->clientTopics) == 0) {
    tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
L
Liu Jicong 已提交
1488
    /*printf("over1\n");*/
wafwerar's avatar
wafwerar 已提交
1489
    taosMsleep(blocking_time);
1490 1491 1492 1493
    return NULL;
  }
  SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
  if (taosArrayGetSize(pTopic->vgs) == 0) {
L
Liu Jicong 已提交
1494
    /*printf("over2\n");*/
wafwerar's avatar
wafwerar 已提交
1495
    taosMsleep(blocking_time);
1496 1497 1498
    return NULL;
  }

L
Liu Jicong 已提交
1499
  tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
L
Liu Jicong 已提交
1500
  int32_t beginVgIdx = pTopic->nextVgIdx;
L
Liu Jicong 已提交
1501
  while (1) {
L
Liu Jicong 已提交
1502 1503 1504
    pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
    SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
    /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
L
Liu Jicong 已提交
1505
    SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
L
Liu Jicong 已提交
1506 1507
    if (pReq == NULL) {
      ASSERT(false);
wafwerar's avatar
wafwerar 已提交
1508
      taosMsleep(blocking_time);
L
Liu Jicong 已提交
1509 1510
      return NULL;
    }
1511

wafwerar's avatar
wafwerar 已提交
1512
    SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1513 1514
    if (param == NULL) {
      ASSERT(false);
wafwerar's avatar
wafwerar 已提交
1515
      taosMsleep(blocking_time);
L
Liu Jicong 已提交
1516 1517 1518 1519 1520 1521 1522 1523
      return NULL;
    }
    param->tmq = tmq;
    param->retMsg = &tmq_message;
    param->pVg = pVg;
    tsem_init(&param->rspSem, 0, 0);

    SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
X
Xiaoyu Wang 已提交
1524 1525 1526 1527 1528
    pRequest->body.requestMsg = (SDataBuf){
        .pData = pReq,
        .len = sizeof(SMqConsumeReq),
        .handle = NULL,
    };
1529

L
Liu Jicong 已提交
1530 1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542
    SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
    sendInfo->requestObjRefId = 0;
    sendInfo->param = param;
    sendInfo->fp = tmqPollCb;

    /*printf("req offset: %ld\n", pReq->offset);*/

    int64_t transporterId = 0;
    asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
    tmq->pollCnt++;

    tsem_wait(&param->rspSem);
    tsem_destroy(&param->rspSem);
wafwerar's avatar
wafwerar 已提交
1543
    taosMemoryFree(param);
L
Liu Jicong 已提交
1544 1545 1546

    if (tmq_message == NULL) {
      if (beginVgIdx == pTopic->nextVgIdx) {
wafwerar's avatar
wafwerar 已提交
1547
        taosMsleep(blocking_time);
L
Liu Jicong 已提交
1548 1549 1550 1551
      } else {
        continue;
      }
    }
L
Liu Jicong 已提交
1552

L
Liu Jicong 已提交
1553
    return tmq_message;
L
Liu Jicong 已提交
1554
  }
1555 1556 1557 1558

  /*tsem_wait(&pRequest->body.rspSem);*/

  /*if (body != NULL) {*/
L
Liu Jicong 已提交
1559
  /*destroySendMsgInfo(body);*/
1560 1561 1562
  /*}*/

  /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
L
Liu Jicong 已提交
1563
  /*pRequest->code = terrno;*/
1564 1565 1566 1567
  /*}*/

  /*return pRequest;*/
}
X
Xiaoyu Wang 已提交
1568
#endif
1569

L
Liu Jicong 已提交
1570
#if 0
L
Liu Jicong 已提交
1571
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
L
Liu Jicong 已提交
1572
  if (tmq_topic_vgroup_list != NULL) {
L
Liu Jicong 已提交
1573
    // TODO
L
Liu Jicong 已提交
1574 1575
  }

L
Liu Jicong 已提交
1576
  // TODO: change semaphore to gate
L
Liu Jicong 已提交
1577 1578 1579
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
1580
      SMqClientVg*   pVg = taosArrayGet(pTopic->vgs, j);
L
Liu Jicong 已提交
1581
      SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
L
Liu Jicong 已提交
1582

L
Liu Jicong 已提交
1583 1584
      SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
      pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
wafwerar's avatar
wafwerar 已提交
1585
      SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
L
Liu Jicong 已提交
1586 1587 1588 1589 1590 1591 1592 1593 1594 1595 1596 1597 1598 1599 1600 1601 1602 1603 1604 1605
      if (pParam == NULL) {
        continue;
      }
      pParam->tmq = tmq;
      pParam->pVg = pVg;
      pParam->async = async;
      if (!async) tsem_init(&pParam->rspSem, 0, 0);

      SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
      sendInfo->requestObjRefId = 0;
      sendInfo->param = pParam;
      sendInfo->fp = tmqCommitCb;

      int64_t transporterId = 0;
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

      if (!async) tsem_wait(&pParam->rspSem);
    }
  }

L
Liu Jicong 已提交
1606
  return 0;
1607
}
L
Liu Jicong 已提交
1608
#endif
1609

L
Liu Jicong 已提交
1610
#if 0
1611 1612
void tmq_message_destroy(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return;
L
Liu Jicong 已提交
1613
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1614
  tDeleteSMqConsumeRsp(pRsp);
wafwerar's avatar
wafwerar 已提交
1615
  /*taosMemoryFree(tmq_message);*/
X
Xiaoyu Wang 已提交
1616
  taosFreeQitem(tmq_message);
1617
}
L
Liu Jicong 已提交
1618
#endif
1619

L
Liu Jicong 已提交
1620
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
L
Liu Jicong 已提交
1621 1622 1623 1624 1625 1626 1627

const char* tmq_err2str(tmq_resp_err_t err) {
  if (err == TMQ_RESP_ERR__SUCCESS) {
    return "success";
  }
  return "fail";
}
L
Liu Jicong 已提交
1628

L
Liu Jicong 已提交
1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652
char* tmq_get_topic_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->topic;
  } else {
    return NULL;
  }
}

int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
  } else {
    return -1;
  }
}

void tmq_message_destroy(TAOS_RES* res) {
  if (res == NULL) return;
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
  }
}