tmq.c 55.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "clientInt.h"
#include "clientLog.h"
#include "parser.h"
H
Haojun Liao 已提交
19
#include "tdatablock.h"
20 21 22
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
X
Xiaoyu Wang 已提交
23
#include "tqueue.h"
24
#include "tref.h"
L
Liu Jicong 已提交
25 26
#include "ttimer.h"

L
Liu Jicong 已提交
27 28 29 30 31 32 33 34
int32_t tmqAskEp(tmq_t* tmq, bool async);

typedef struct {
  int8_t inited;
  tmr_h  timer;
} SMqMgmt;

static SMqMgmt tmqMgmt = {0};
35

L
Liu Jicong 已提交
36 37 38 39 40 41
typedef struct {
  int8_t  tmqRspType;
  int32_t epoch;
} SMqRspWrapper;

typedef struct {
L
Liu Jicong 已提交
42 43 44
  int8_t      tmqRspType;
  int32_t     epoch;
  SMqAskEpRsp msg;
L
Liu Jicong 已提交
45 46
} SMqAskEpRspWrapper;

L
Liu Jicong 已提交
47
struct tmq_list_t {
L
Liu Jicong 已提交
48
  SArray container;
L
Liu Jicong 已提交
49
};
L
Liu Jicong 已提交
50

L
Liu Jicong 已提交
51
struct tmq_conf_t {
52 53 54 55 56 57 58 59 60 61 62
  char     clientId[256];
  char     groupId[TSDB_CGROUP_LEN];
  int8_t   autoCommit;
  int8_t   resetOffset;
  int8_t   withTbName;
  uint16_t port;
  int32_t  autoCommitInterval;
  char*    ip;
  char*    user;
  char*    pass;
  /*char*          db;*/
63
  tmq_commit_cb* commitCb;
L
Liu Jicong 已提交
64
  void*          commitCbUserParam;
L
Liu Jicong 已提交
65 66 67
};

struct tmq_t {
L
Liu Jicong 已提交
68
  // conf
L
Liu Jicong 已提交
69 70
  char           groupId[TSDB_CGROUP_LEN];
  char           clientId[256];
71
  int8_t         withTbName;
L
Liu Jicong 已提交
72
  int8_t         autoCommit;
L
Liu Jicong 已提交
73
  int32_t        autoCommitInterval;
L
Liu Jicong 已提交
74
  int32_t        resetOffsetCfg;
L
Liu Jicong 已提交
75
  int64_t        consumerId;
L
Liu Jicong 已提交
76 77
  tmq_commit_cb* commitCb;
  void*          commitCbUserParam;
L
Liu Jicong 已提交
78 79 80 81

  // status
  int8_t  status;
  int32_t epoch;
L
Liu Jicong 已提交
82 83
#if 0
  int8_t  epStatus;
L
Liu Jicong 已提交
84
  int32_t epSkipCnt;
L
Liu Jicong 已提交
85
#endif
L
Liu Jicong 已提交
86 87
  int64_t pollCnt;

L
Liu Jicong 已提交
88 89 90 91 92
  // timer
  tmr_h hbTimer;
  tmr_h reportTimer;
  tmr_h commitTimer;

L
Liu Jicong 已提交
93 94 95 96
  // connection
  STscObj* pTscObj;

  // container
L
Liu Jicong 已提交
97
  SArray*     clientTopics;  // SArray<SMqClientTopic>
L
Liu Jicong 已提交
98
  STaosQueue* mqueue;        // queue of rsp
L
Liu Jicong 已提交
99
  STaosQall*  qall;
L
Liu Jicong 已提交
100 101 102 103
  STaosQueue* delayedTask;  // delayed task queue for heartbeat and auto commit

  // ctl
  tsem_t rspSem;
L
Liu Jicong 已提交
104 105
};

X
Xiaoyu Wang 已提交
106 107 108 109 110 111 112 113
enum {
  TMQ_VG_STATUS__IDLE = 0,
  TMQ_VG_STATUS__WAIT,
};

enum {
  TMQ_CONSUMER_STATUS__INIT = 0,
  TMQ_CONSUMER_STATUS__READY,
114
  TMQ_CONSUMER_STATUS__NO_TOPIC,
L
Liu Jicong 已提交
115 116
};

L
Liu Jicong 已提交
117 118 119 120 121 122
enum {
  TMQ_DELAYED_TASK__HB = 1,
  TMQ_DELAYED_TASK__REPORT,
  TMQ_DELAYED_TASK__COMMIT,
};

L
Liu Jicong 已提交
123
typedef struct {
124 125 126
  // statistics
  int64_t pollCnt;
  // offset
127
  int64_t committedOffset;
128
  int64_t currentOffset;
L
Liu Jicong 已提交
129
  // connection info
130
  int32_t vgId;
X
Xiaoyu Wang 已提交
131
  int32_t vgStatus;
L
Liu Jicong 已提交
132
  int32_t vgSkipCnt;
133 134 135
  SEpSet  epSet;
} SMqClientVg;

L
Liu Jicong 已提交
136
typedef struct {
137
  // subscribe info
L
Liu Jicong 已提交
138
  char* topicName;
L
Liu Jicong 已提交
139
  char  db[TSDB_DB_FNAME_LEN];
L
Liu Jicong 已提交
140 141 142

  SArray* vgs;  // SArray<SMqClientVg>

L
Liu Jicong 已提交
143 144
  int8_t         isSchemaAdaptive;
  SSchemaWrapper schema;
145 146
} SMqClientTopic;

L
Liu Jicong 已提交
147 148 149 150 151
typedef struct {
  int8_t          tmqRspType;
  int32_t         epoch;
  SMqClientVg*    vgHandle;
  SMqClientTopic* topicHandle;
L
Liu Jicong 已提交
152 153 154 155
  union {
    SMqDataBlkRsp dataRsp;
    SMqMetaRsp    metaRsp;
  };
L
Liu Jicong 已提交
156 157
} SMqPollRspWrapper;

L
Liu Jicong 已提交
158
typedef struct {
L
Liu Jicong 已提交
159 160 161
  tmq_t*  tmq;
  tsem_t  rspSem;
  int32_t rspErr;
L
Liu Jicong 已提交
162
} SMqSubscribeCbParam;
L
Liu Jicong 已提交
163

L
Liu Jicong 已提交
164
typedef struct {
165
  tmq_t*  tmq;
L
Liu Jicong 已提交
166
  int32_t code;
L
Liu Jicong 已提交
167
  int32_t async;
X
Xiaoyu Wang 已提交
168
  tsem_t  rspSem;
169 170
} SMqAskEpCbParam;

L
Liu Jicong 已提交
171
typedef struct {
L
Liu Jicong 已提交
172 173
  tmq_t*          tmq;
  SMqClientVg*    pVg;
L
Liu Jicong 已提交
174
  SMqClientTopic* pTopic;
L
Liu Jicong 已提交
175
  int32_t         epoch;
L
Liu Jicong 已提交
176
  int32_t         vgId;
L
Liu Jicong 已提交
177
  tsem_t          rspSem;
X
Xiaoyu Wang 已提交
178
} SMqPollCbParam;
179

L
Liu Jicong 已提交
180
typedef struct {
L
Liu Jicong 已提交
181
  tmq_t*         tmq;
L
Liu Jicong 已提交
182 183
  int8_t         async;
  int8_t         automatic;
L
Liu Jicong 已提交
184
  int8_t         freeOffsets;
L
Liu Jicong 已提交
185
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
186
  tsem_t         rspSem;
L
Liu Jicong 已提交
187
  int32_t        rspErr;
L
Liu Jicong 已提交
188
  SArray*        offsets;
L
Liu Jicong 已提交
189
  void*          userParam;
L
Liu Jicong 已提交
190
} SMqCommitCbParam;
L
Liu Jicong 已提交
191

192 193 194 195 196
typedef struct {
  tmq_t*         tmq;
  int8_t         automatic;
  int8_t         async;
  int8_t         freeOffsets;
L
Liu Jicong 已提交
197 198
  int32_t        waitingRspNum;
  int32_t        totalRspNum;
L
Liu Jicong 已提交
199
  int32_t        rspErr;
200
  tmq_commit_cb* userCb;
L
Liu Jicong 已提交
201 202 203 204
  /*SArray*        successfulOffsets;*/
  /*SArray*        failedOffsets;*/
  void*  userParam;
  tsem_t rspSem;
205 206 207 208 209 210 211
} SMqCommitCbParamSet;

typedef struct {
  SMqCommitCbParamSet* params;
  STqOffset*           pOffset;
} SMqCommitCbParam2;

212
tmq_conf_t* tmq_conf_new() {
wafwerar's avatar
wafwerar 已提交
213
  tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
214
  conf->withTbName = false;
L
Liu Jicong 已提交
215
  conf->autoCommit = true;
L
Liu Jicong 已提交
216
  conf->autoCommitInterval = 5000;
X
Xiaoyu Wang 已提交
217
  conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
218 219 220
  return conf;
}

L
Liu Jicong 已提交
221
void tmq_conf_destroy(tmq_conf_t* conf) {
L
Liu Jicong 已提交
222 223 224 225 226 227
  if (conf) {
    if (conf->ip) taosMemoryFree(conf->ip);
    if (conf->user) taosMemoryFree(conf->user);
    if (conf->pass) taosMemoryFree(conf->pass);
    taosMemoryFree(conf);
  }
L
Liu Jicong 已提交
228 229 230
}

tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
231 232
  if (strcmp(key, "group.id") == 0) {
    strcpy(conf->groupId, value);
L
Liu Jicong 已提交
233
    return TMQ_CONF_OK;
234
  }
L
Liu Jicong 已提交
235

236 237
  if (strcmp(key, "client.id") == 0) {
    strcpy(conf->clientId, value);
L
Liu Jicong 已提交
238 239
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
240

L
Liu Jicong 已提交
241 242
  if (strcmp(key, "enable.auto.commit") == 0) {
    if (strcmp(value, "true") == 0) {
L
Liu Jicong 已提交
243
      conf->autoCommit = true;
L
Liu Jicong 已提交
244 245
      return TMQ_CONF_OK;
    } else if (strcmp(value, "false") == 0) {
L
Liu Jicong 已提交
246
      conf->autoCommit = false;
L
Liu Jicong 已提交
247 248 249 250
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
251
  }
L
Liu Jicong 已提交
252

L
Liu Jicong 已提交
253 254 255 256 257
  if (strcmp(key, "auto.commit.interval.ms") == 0) {
    conf->autoCommitInterval = atoi(value);
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
258 259 260 261 262 263 264 265 266 267 268 269 270 271
  if (strcmp(key, "auto.offset.reset") == 0) {
    if (strcmp(value, "none") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__NONE;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "earliest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
      return TMQ_CONF_OK;
    } else if (strcmp(value, "latest") == 0) {
      conf->resetOffset = TMQ_CONF__RESET_OFFSET__LATEST;
      return TMQ_CONF_OK;
    } else {
      return TMQ_CONF_INVALID;
    }
  }
L
Liu Jicong 已提交
272

273 274
  if (strcmp(key, "msg.with.table.name") == 0) {
    if (strcmp(value, "true") == 0) {
275
      conf->withTbName = true;
L
Liu Jicong 已提交
276
      return TMQ_CONF_OK;
277
    } else if (strcmp(value, "false") == 0) {
278
      conf->withTbName = false;
L
Liu Jicong 已提交
279
      return TMQ_CONF_OK;
280 281 282 283 284
    } else {
      return TMQ_CONF_INVALID;
    }
  }

L
Liu Jicong 已提交
285
  if (strcmp(key, "td.connect.ip") == 0) {
L
Liu Jicong 已提交
286 287 288
    conf->ip = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
289
  if (strcmp(key, "td.connect.user") == 0) {
L
Liu Jicong 已提交
290 291 292
    conf->user = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
293
  if (strcmp(key, "td.connect.pass") == 0) {
L
Liu Jicong 已提交
294 295 296
    conf->pass = strdup(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
297
  if (strcmp(key, "td.connect.port") == 0) {
L
Liu Jicong 已提交
298 299 300
    conf->port = atoi(value);
    return TMQ_CONF_OK;
  }
L
Liu Jicong 已提交
301
  if (strcmp(key, "td.connect.db") == 0) {
302
    /*conf->db = strdup(value);*/
L
Liu Jicong 已提交
303 304 305
    return TMQ_CONF_OK;
  }

L
Liu Jicong 已提交
306
  return TMQ_CONF_UNKNOWN;
307 308 309
}

tmq_list_t* tmq_list_new() {
L
Liu Jicong 已提交
310 311
  //
  return (tmq_list_t*)taosArrayInit(0, sizeof(void*));
312 313
}

L
Liu Jicong 已提交
314 315 316
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
  SArray* container = &list->container;
  char*   topic = strdup(src);
L
fix  
Liu Jicong 已提交
317
  if (taosArrayPush(container, &topic) == NULL) return -1;
318 319 320
  return 0;
}

L
Liu Jicong 已提交
321
void tmq_list_destroy(tmq_list_t* list) {
L
Liu Jicong 已提交
322
  SArray* container = &list->container;
L
Liu Jicong 已提交
323
  taosArrayDestroyP(container, taosMemoryFree);
L
Liu Jicong 已提交
324 325
}

L
Liu Jicong 已提交
326 327 328 329 330 331 332 333 334 335
int32_t tmq_list_get_size(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return taosArrayGetSize(container);
}

char** tmq_list_to_c_array(const tmq_list_t* list) {
  const SArray* container = &list->container;
  return container->pData;
}

L
Liu Jicong 已提交
336 337 338 339
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
  return sprintf(dst, "%s:%d", topicName, vg);
}

L
Liu Jicong 已提交
340 341
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
L
Liu Jicong 已提交
342
  pParam->rspErr = code;
L
Liu Jicong 已提交
343 344
  if (pParam->async) {
    if (pParam->automatic && pParam->tmq->commitCb) {
L
Liu Jicong 已提交
345
      pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
L
Liu Jicong 已提交
346
    } else if (!pParam->automatic && pParam->userCb) {
L
Liu Jicong 已提交
347
      pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
L
Liu Jicong 已提交
348 349
    }

L
Liu Jicong 已提交
350
    if (pParam->freeOffsets) {
L
Liu Jicong 已提交
351 352 353 354 355 356 357 358 359 360
      taosArrayDestroy(pParam->offsets);
    }

    taosMemoryFree(pParam);
  } else {
    tsem_post(&pParam->rspSem);
  }
  return 0;
}

361 362 363 364
int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) {
  SMqCommitCbParam2*   pParam = (SMqCommitCbParam2*)param;
  SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
  // push into array
L
Liu Jicong 已提交
365
#if 0
366 367 368 369 370
  if (code == 0) {
    taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset);
  } else {
    taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset);
  }
L
Liu Jicong 已提交
371
#endif
L
Liu Jicong 已提交
372

L
Liu Jicong 已提交
373 374 375
  /*tscDebug("receive offset commit cb of %s on vg %d, offset is %ld", pParam->pOffset->subKey, pParam->->vgId,
   * pOffset->version);*/

376
  // count down waiting rsp
L
Liu Jicong 已提交
377
  int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
378 379 380 381 382 383 384
  ASSERT(waitingRspNum >= 0);

  if (waitingRspNum == 0) {
    // if no more waiting rsp
    if (pParamSet->async) {
      // call async cb func
      if (pParamSet->automatic && pParamSet->tmq->commitCb) {
L
Liu Jicong 已提交
385
        pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam);
386 387
      } else if (!pParamSet->automatic && pParamSet->userCb) {
        // sem post
L
Liu Jicong 已提交
388
        pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam);
389
      }
L
Liu Jicong 已提交
390 391
    } else {
      tsem_post(&pParamSet->rspSem);
392 393
    }

L
Liu Jicong 已提交
394
#if 0
395 396
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
L
Liu Jicong 已提交
397
#endif
398 399 400 401
  }
  return 0;
}

L
Liu Jicong 已提交
402 403
int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
                        void* userParam) {
404 405
  int32_t code = -1;

L
Liu Jicong 已提交
406
  if (msg != NULL) {
407 408 409 410 411 412 413 414 415 416 417
    char*   topic;
    int32_t vgId;
    if (TD_RES_TMQ(msg)) {
      SMqRspObj* pRspObj = (SMqRspObj*)msg;
      topic = pRspObj->topic;
      vgId = pRspObj->vgId;
    } else if (TD_RES_TMQ_META(msg)) {
      SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
      topic = pMetaRspObj->topic;
      vgId = pMetaRspObj->vgId;
    } else {
L
Liu Jicong 已提交
418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
      return TSDB_CODE_TMQ_INVALID_MSG;
    }

    SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
    if (pParamSet == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return -1;
    }
    pParamSet->tmq = tmq;
    pParamSet->automatic = automatic;
    pParamSet->async = async;
    pParamSet->freeOffsets = 1;
    pParamSet->userCb = userCb;
    pParamSet->userParam = userParam;
    tsem_init(&pParamSet->rspSem, 0, 0);

    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
436
      if (strcmp(pTopic->topicName, topic) == 0) {
L
Liu Jicong 已提交
437 438
        for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
          SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
439
          if (pVg->vgId == vgId) {
L
Liu Jicong 已提交
440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536
            if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
              tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
                       tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);

              return 0;
            }

            STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
            if (pOffset == NULL) {
              terrno = TSDB_CODE_OUT_OF_MEMORY;
              return -1;
            }
            pOffset->type = TMQ_OFFSET__LOG;
            pOffset->version = pVg->currentOffset;

            int32_t groupLen = strlen(tmq->groupId);
            memcpy(pOffset->subKey, tmq->groupId, groupLen);
            pOffset->subKey[groupLen] = TMQ_SEPARATOR;
            strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);

            int32_t len;
            int32_t code;
            tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
            if (code < 0) {
              ASSERT(0);
            }
            void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
            ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);

            void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

            SEncoder encoder;
            tEncoderInit(&encoder, abuf, len);
            tEncodeSTqOffset(&encoder, pOffset);

            // build param
            SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
            pParam->params = pParamSet;
            pParam->pOffset = pOffset;

            // build send info
            SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
            if (pMsgSendInfo == NULL) {
              // TODO
              continue;
            }
            pMsgSendInfo->msgInfo = (SDataBuf){
                .pData = buf,
                .len = sizeof(SMsgHead) + len,
                .handle = NULL,
            };

            tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey,
                     pVg->vgId, pOffset->version);

            // TODO: put into cb
            pVg->committedOffset = pVg->currentOffset;

            pMsgSendInfo->requestId = generateRequestId();
            pMsgSendInfo->requestObjRefId = 0;
            pMsgSendInfo->param = pParam;
            pMsgSendInfo->fp = tmqCommitCb2;
            pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
            // send msg

            int64_t transporterId = 0;
            asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
            pParamSet->waitingRspNum++;
            pParamSet->totalRspNum++;
          }
        }
      }
    }
    if (pParamSet->totalRspNum == 0) {
      tsem_destroy(&pParamSet->rspSem);
      taosMemoryFree(pParamSet);
      return 0;
    }

    if (!async) {
      tsem_wait(&pParamSet->rspSem);
      code = pParamSet->rspErr;
      tsem_destroy(&pParamSet->rspSem);
    } else {
      code = 0;
    }

    if (code != 0 && async) {
      if (automatic) {
        tmq->commitCb(tmq, code, tmq->commitCbUserParam);
      } else {
        userCb(tmq, code, userParam);
      }
    }
    return 0;
  }

537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
  SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
  if (pParamSet == NULL) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return -1;
  }
  pParamSet->tmq = tmq;
  pParamSet->automatic = automatic;
  pParamSet->async = async;
  pParamSet->freeOffsets = 1;
  pParamSet->userCb = userCb;
  pParamSet->userParam = userParam;
  tsem_init(&pParamSet->rspSem, 0, 0);

  for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
552 553 554 555

    tscDebug("consumer %ld begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName,
             (int32_t)taosArrayGetSize(pTopic->vgs));

556
    for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
L
Liu Jicong 已提交
557 558 559 560 561 562 563 564 565
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);

      tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId);

      /*if (pVg->currentOffset < 0) {*/
      if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
        tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
                 tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);

L
Liu Jicong 已提交
566 567 568
        continue;
      }
      STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
569 570 571 572
      if (pOffset == NULL) {
        terrno = TSDB_CODE_OUT_OF_MEMORY;
        return -1;
      }
L
Liu Jicong 已提交
573 574
      pOffset->type = TMQ_OFFSET__LOG;
      pOffset->version = pVg->currentOffset;
L
Liu Jicong 已提交
575 576 577 578 579 580

      int32_t groupLen = strlen(tmq->groupId);
      memcpy(pOffset->subKey, tmq->groupId, groupLen);
      pOffset->subKey[groupLen] = TMQ_SEPARATOR;
      strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);

581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
      int32_t len;
      int32_t code;
      tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
      if (code < 0) {
        ASSERT(0);
      }
      void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
      ((SMsgHead*)buf)->vgId = htonl(pVg->vgId);

      void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));

      SEncoder encoder;
      tEncoderInit(&encoder, abuf, len);
      tEncodeSTqOffset(&encoder, pOffset);

      // build param
      SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2));
      pParam->params = pParamSet;
      pParam->pOffset = pOffset;

      // build send info
      SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
      if (pMsgSendInfo == NULL) {
        // TODO
        continue;
      }
      pMsgSendInfo->msgInfo = (SDataBuf){
          .pData = buf,
L
Liu Jicong 已提交
609
          .len = sizeof(SMsgHead) + len,
610 611 612
          .handle = NULL,
      };

L
Liu Jicong 已提交
613 614 615
      tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId,
               pOffset->version);

L
Liu Jicong 已提交
616 617 618
      // TODO: put into cb
      pVg->committedOffset = pVg->currentOffset;

619 620 621 622
      pMsgSendInfo->requestId = generateRequestId();
      pMsgSendInfo->requestObjRefId = 0;
      pMsgSendInfo->param = pParam;
      pMsgSendInfo->fp = tmqCommitCb2;
L
Liu Jicong 已提交
623
      pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
624 625 626
      // send msg

      int64_t transporterId = 0;
L
Liu Jicong 已提交
627
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
L
Liu Jicong 已提交
628 629
      pParamSet->waitingRspNum++;
      pParamSet->totalRspNum++;
630 631 632
    }
  }

L
Liu Jicong 已提交
633 634 635 636 637 638
  if (pParamSet->totalRspNum == 0) {
    tsem_destroy(&pParamSet->rspSem);
    taosMemoryFree(pParamSet);
    return 0;
  }

639 640 641 642 643 644 645 646 647 648
  if (!async) {
    tsem_wait(&pParamSet->rspSem);
    code = pParamSet->rspErr;
    tsem_destroy(&pParamSet->rspSem);
  } else {
    code = 0;
  }

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
649
      tmq->commitCb(tmq, code, tmq->commitCbUserParam);
650
    } else {
L
Liu Jicong 已提交
651
      userCb(tmq, code, userParam);
652 653 654
    }
  }

L
Liu Jicong 已提交
655
#if 0
656 657 658 659
  if (!async) {
    taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
    taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
  }
L
Liu Jicong 已提交
660
#endif
661 662 663 664

  return 0;
}

L
Liu Jicong 已提交
665 666
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
L
Liu Jicong 已提交
667 668 669 670 671 672
                       tmq_commit_cb* userCb, void* userParam) {
  SMqCMCommitOffsetReq req;
  SArray*              pOffsets = NULL;
  void*                buf = NULL;
  SMqCommitCbParam*    pParam = NULL;
  SMsgSendInfo*        sendInfo = NULL;
L
Liu Jicong 已提交
673 674
  int8_t               freeOffsets;
  int32_t              code = -1;
L
Liu Jicong 已提交
675

L
Liu Jicong 已提交
676
  if (msg == NULL) {
L
Liu Jicong 已提交
677
    freeOffsets = 1;
L
Liu Jicong 已提交
678 679 680 681 682 683 684 685 686 687 688 689 690 691
    pOffsets = taosArrayInit(0, sizeof(SMqOffset));
    for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
      SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
      for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
        SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
        SMqOffset    offset;
        tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
        tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
        offset.vgId = pVg->vgId;
        offset.offset = pVg->currentOffset;
        taosArrayPush(pOffsets, &offset);
      }
    }
  } else {
L
Liu Jicong 已提交
692
    freeOffsets = 0;
L
Liu Jicong 已提交
693
    pOffsets = (SArray*)&msg->container;
L
Liu Jicong 已提交
694 695 696 697 698
  }

  req.num = (int32_t)pOffsets->size;
  req.offsets = pOffsets->pData;

L
Liu Jicong 已提交
699 700 701 702
  SEncoder encoder;

  tEncoderInit(&encoder, NULL, 0);
  code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
L
Liu Jicong 已提交
703 704 705
  if (code < 0) {
    goto END;
  }
L
Liu Jicong 已提交
706
  int32_t tlen = encoder.pos;
L
Liu Jicong 已提交
707 708
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) {
L
Liu Jicong 已提交
709
    tEncoderClear(&encoder);
L
Liu Jicong 已提交
710 711
    goto END;
  }
L
Liu Jicong 已提交
712 713
  tEncoderClear(&encoder);

L
Liu Jicong 已提交
714 715 716 717 718 719 720 721 722 723 724 725
  tEncoderInit(&encoder, buf, tlen);
  tEncodeSMqCMCommitOffsetReq(&encoder, &req);
  tEncoderClear(&encoder);

  pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
  if (pParam == NULL) {
    goto END;
  }
  pParam->tmq = tmq;
  pParam->automatic = automatic;
  pParam->async = async;
  pParam->offsets = pOffsets;
L
Liu Jicong 已提交
726
  pParam->freeOffsets = freeOffsets;
L
Liu Jicong 已提交
727 728 729 730
  pParam->userCb = userCb;
  pParam->userParam = userParam;
  if (!async) tsem_init(&pParam->rspSem, 0, 0);

731
  sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
  if (sendInfo == NULL) goto END;
  sendInfo->msgInfo = (SDataBuf){
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqCommitCb;
  sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;

  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

  if (!async) {
    tsem_wait(&pParam->rspSem);
    code = pParam->rspErr;
    tsem_destroy(&pParam->rspSem);
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
755 756
  } else {
    code = 0;
L
Liu Jicong 已提交
757 758 759 760 761 762 763
  }

  // avoid double free if msg is sent
  buf = NULL;

END:
  if (buf) taosMemoryFree(buf);
L
Liu Jicong 已提交
764 765
  /*if (pParam) taosMemoryFree(pParam);*/
  /*if (sendInfo) taosMemoryFree(sendInfo);*/
L
Liu Jicong 已提交
766 767 768

  if (code != 0 && async) {
    if (automatic) {
L
Liu Jicong 已提交
769
      tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
L
Liu Jicong 已提交
770
    } else {
L
Liu Jicong 已提交
771
      userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
L
Liu Jicong 已提交
772 773 774
    }
  }

L
Liu Jicong 已提交
775
  if (!async && freeOffsets) {
L
Liu Jicong 已提交
776 777 778 779
    taosArrayDestroy(pOffsets);
  }
  return code;
}
L
Liu Jicong 已提交
780
#endif
L
Liu Jicong 已提交
781

L
Liu Jicong 已提交
782 783
void tmqAssignDelayedHbTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
784
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
785 786
  *pTaskType = TMQ_DELAYED_TASK__HB;
  taosWriteQitem(tmq->delayedTask, pTaskType);
787
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
788 789 790 791
}

void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
792
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
793 794
  *pTaskType = TMQ_DELAYED_TASK__COMMIT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
795
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
796 797 798 799
}

void tmqAssignDelayedReportTask(void* param, void* tmrId) {
  tmq_t*  tmq = (tmq_t*)param;
800
  int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
L
Liu Jicong 已提交
801 802
  *pTaskType = TMQ_DELAYED_TASK__REPORT;
  taosWriteQitem(tmq->delayedTask, pTaskType);
803
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
804 805 806 807 808 809 810 811 812 813 814
}

int32_t tmqHandleAllDelayedTask(tmq_t* tmq) {
  STaosQall* qall = taosAllocateQall();
  taosReadAllQitems(tmq->delayedTask, qall);
  while (1) {
    int8_t* pTaskType = NULL;
    taosGetQitem(qall, (void**)&pTaskType);
    if (pTaskType == NULL) break;

    if (*pTaskType == TMQ_DELAYED_TASK__HB) {
L
Liu Jicong 已提交
815
      tmqAskEp(tmq, true);
L
Liu Jicong 已提交
816 817
      taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
L
Liu Jicong 已提交
818
      tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
819 820 821 822 823
      taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer);
    } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
    } else {
      ASSERT(0);
    }
L
Liu Jicong 已提交
824
    taosFreeQitem(pTaskType);
L
Liu Jicong 已提交
825 826 827 828 829
  }
  taosFreeQall(qall);
  return 0;
}

L
Liu Jicong 已提交
830
void tmqClearUnhandleMsg(tmq_t* tmq) {
L
Liu Jicong 已提交
831
  SMqRspWrapper* msg = NULL;
L
Liu Jicong 已提交
832 833 834 835 836 837 838 839
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }

L
fix  
Liu Jicong 已提交
840
  msg = NULL;
L
Liu Jicong 已提交
841 842 843 844 845 846 847 848 849 850
  taosReadAllQitems(tmq->mqueue, tmq->qall);
  while (1) {
    taosGetQitem(tmq->qall, (void**)&msg);
    if (msg)
      taosFreeQitem(msg);
    else
      break;
  }
}

L
Liu Jicong 已提交
851 852 853
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
  SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
  pParam->rspErr = code;
L
Liu Jicong 已提交
854
  /*tmq_t* tmq = pParam->tmq;*/
L
Liu Jicong 已提交
855 856 857
  tsem_post(&pParam->rspSem);
  return 0;
}
858

L
Liu Jicong 已提交
859
int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
X
Xiaoyu Wang 已提交
860 861 862 863
  if (*topics == NULL) {
    *topics = tmq_list_new();
  }
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
L
Liu Jicong 已提交
864
    SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i);
L
Liu Jicong 已提交
865
    tmq_list_append(*topics, strchr(topic->topicName, '.') + 1);
X
Xiaoyu Wang 已提交
866
  }
L
Liu Jicong 已提交
867
  return 0;
X
Xiaoyu Wang 已提交
868 869
}

L
Liu Jicong 已提交
870 871 872
int32_t tmq_unsubscribe(tmq_t* tmq) {
  tmq_list_t* lst = tmq_list_new();
  int32_t     rsp = tmq_subscribe(tmq, lst);
L
Liu Jicong 已提交
873 874
  tmq_list_destroy(lst);
  return rsp;
X
Xiaoyu Wang 已提交
875 876
}

L
Liu Jicong 已提交
877
#if 0
878
tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
wafwerar's avatar
wafwerar 已提交
879
  tmq_t* pTmq = taosMemoryCalloc(sizeof(tmq_t), 1);
880 881 882 883 884 885
  if (pTmq == NULL) {
    return NULL;
  }
  pTmq->pTscObj = (STscObj*)conn;
  pTmq->status = 0;
  pTmq->pollCnt = 0;
L
Liu Jicong 已提交
886
  pTmq->epoch = 0;
L
fix txn  
Liu Jicong 已提交
887
  pTmq->epStatus = 0;
L
temp  
Liu Jicong 已提交
888
  pTmq->epSkipCnt = 0;
L
Liu Jicong 已提交
889
  // set conf
890 891
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
L
Liu Jicong 已提交
892
  pTmq->autoCommit = conf->autoCommit;
893
  pTmq->commit_cb = conf->commit_cb;
L
Liu Jicong 已提交
894
  pTmq->resetOffsetCfg = conf->resetOffset;
L
Liu Jicong 已提交
895

L
Liu Jicong 已提交
896 897 898 899 900 901 902 903 904 905
  pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  if (pTmq->clientTopics == NULL) {
    taosMemoryFree(pTmq);
    return NULL;
  }

  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();

L
Liu Jicong 已提交
906 907
  tsem_init(&pTmq->rspSem, 0, 0);

L
Liu Jicong 已提交
908 909
  return pTmq;
}
L
Liu Jicong 已提交
910
#endif
L
Liu Jicong 已提交
911

L
Liu Jicong 已提交
912
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
L
Liu Jicong 已提交
913 914 915 916 917 918 919 920 921 922 923
  // init timer
  int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
  if (inited == 0) {
    tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
    if (tmqMgmt.timer == NULL) {
      atomic_store_8(&tmqMgmt.inited, 0);
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return NULL;
    }
  }

L
Liu Jicong 已提交
924 925 926 927
  tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
  if (pTmq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
928

L
Liu Jicong 已提交
929 930 931 932 933
  const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
  const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;

  ASSERT(user);
  ASSERT(pass);
L
Liu Jicong 已提交
934
  ASSERT(conf->groupId[0]);
L
Liu Jicong 已提交
935

L
Liu Jicong 已提交
936 937 938 939 940 941 942 943
  pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
  pTmq->mqueue = taosOpenQueue();
  pTmq->qall = taosAllocateQall();
  pTmq->delayedTask = taosOpenQueue();

  if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
    goto FAIL;
  }
L
Liu Jicong 已提交
944

L
Liu Jicong 已提交
945 946
  // init status
  pTmq->status = TMQ_CONSUMER_STATUS__INIT;
L
Liu Jicong 已提交
947 948
  pTmq->pollCnt = 0;
  pTmq->epoch = 0;
L
Liu Jicong 已提交
949 950
  /*pTmq->epStatus = 0;*/
  /*pTmq->epSkipCnt = 0;*/
L
Liu Jicong 已提交
951

L
Liu Jicong 已提交
952 953 954
  // set conf
  strcpy(pTmq->clientId, conf->clientId);
  strcpy(pTmq->groupId, conf->groupId);
955
  pTmq->withTbName = conf->withTbName;
L
Liu Jicong 已提交
956
  pTmq->autoCommit = conf->autoCommit;
L
Liu Jicong 已提交
957
  pTmq->autoCommitInterval = conf->autoCommitInterval;
L
Liu Jicong 已提交
958 959
  pTmq->commitCb = conf->commitCb;
  pTmq->commitCbUserParam = conf->commitCbUserParam;
L
Liu Jicong 已提交
960 961
  pTmq->resetOffsetCfg = conf->resetOffset;

L
Liu Jicong 已提交
962
  // assign consumerId
L
Liu Jicong 已提交
963
  pTmq->consumerId = tGenIdPI64();
X
Xiaoyu Wang 已提交
964

L
Liu Jicong 已提交
965 966 967 968
  // init semaphore
  if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
    goto FAIL;
  }
L
Liu Jicong 已提交
969

L
Liu Jicong 已提交
970 971 972 973 974 975
  // init connection
  pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
  if (pTmq->pTscObj == NULL) {
    tsem_destroy(&pTmq->rspSem);
    goto FAIL;
  }
L
Liu Jicong 已提交
976

977
  return pTmq;
L
Liu Jicong 已提交
978 979 980 981 982 983 984 985

FAIL:
  if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics);
  if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue);
  if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
  if (pTmq->qall) taosFreeQall(pTmq->qall);
  taosMemoryFree(pTmq);
  return NULL;
986 987
}

L
Liu Jicong 已提交
988 989
#if 0
int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) {
L
Liu Jicong 已提交
990
  return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam);
L
Liu Jicong 已提交
991
}
L
Liu Jicong 已提交
992
#endif
L
Liu Jicong 已提交
993

L
Liu Jicong 已提交
994
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
L
Liu Jicong 已提交
995 996 997 998 999
  const SArray*   container = &topic_list->container;
  int32_t         sz = taosArrayGetSize(container);
  void*           buf = NULL;
  SCMSubscribeReq req = {0};
  int32_t         code = -1;
1000 1001

  req.consumerId = tmq->consumerId;
L
Liu Jicong 已提交
1002
  tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
1003
  req.topicNames = taosArrayInit(sz, sizeof(void*));
L
Liu Jicong 已提交
1004
  if (req.topicNames == NULL) goto FAIL;
1005

L
Liu Jicong 已提交
1006 1007
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(container, i);
1008 1009

    SName name = {0};
L
Liu Jicong 已提交
1010
    tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
1011

L
Liu Jicong 已提交
1012 1013 1014
    char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
    if (topicFName == NULL) {
      goto FAIL;
1015
    }
L
Liu Jicong 已提交
1016
    tNameExtractFullName(&name, topicFName);
1017

L
Liu Jicong 已提交
1018 1019 1020
    tscDebug("subscribe topic: %s", topicFName);

    taosArrayPush(req.topicNames, &topicFName);
1021 1022
  }

L
Liu Jicong 已提交
1023 1024 1025 1026
  int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
  buf = taosMemoryMalloc(tlen);
  if (buf == NULL) goto FAIL;

1027 1028 1029
  void* abuf = buf;
  tSerializeSCMSubscribeReq(&abuf, &req);

1030
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1031
  if (sendInfo == NULL) goto FAIL;
1032

X
Xiaoyu Wang 已提交
1033
  SMqSubscribeCbParam param = {
L
Liu Jicong 已提交
1034
      .rspErr = 0,
X
Xiaoyu Wang 已提交
1035 1036
      .tmq = tmq,
  };
L
Liu Jicong 已提交
1037

L
Liu Jicong 已提交
1038 1039 1040
  if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;

  sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1041 1042 1043 1044
      .pData = buf,
      .len = tlen,
      .handle = NULL,
  };
1045

L
Liu Jicong 已提交
1046 1047
  sendInfo->requestId = generateRequestId();
  sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1048 1049
  sendInfo->param = &param;
  sendInfo->fp = tmqSubscribeCb;
L
Liu Jicong 已提交
1050 1051
  sendInfo->msgType = TDMT_MND_SUBSCRIBE;

1052 1053 1054 1055 1056
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

L
Liu Jicong 已提交
1057 1058 1059
  // avoid double free if msg is sent
  buf = NULL;

L
Liu Jicong 已提交
1060 1061
  tsem_wait(&param.rspSem);
  tsem_destroy(&param.rspSem);
1062

L
Liu Jicong 已提交
1063 1064 1065
  code = param.rspErr;
  if (code != 0) goto FAIL;

L
Liu Jicong 已提交
1066
  while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
L
fix  
Liu Jicong 已提交
1067
    tscDebug("consumer not ready, retry");
L
Liu Jicong 已提交
1068 1069
    taosMsleep(500);
  }
1070

L
Liu Jicong 已提交
1071
  // init hb timer
1072 1073 1074
  if (tmq->hbTimer == NULL) {
    tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer);
  }
L
Liu Jicong 已提交
1075 1076

  // init auto commit timer
1077
  if (tmq->autoCommit && tmq->commitTimer == NULL) {
L
Liu Jicong 已提交
1078 1079 1080
    tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer);
  }

L
Liu Jicong 已提交
1081 1082 1083
  code = 0;
FAIL:
  if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
L
Liu Jicong 已提交
1084
  if (code != 0 && buf) {
L
Liu Jicong 已提交
1085 1086 1087
    taosMemoryFree(buf);
  }
  return code;
1088 1089
}

L
Liu Jicong 已提交
1090
void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* param) {
L
Liu Jicong 已提交
1091
  //
1092
  conf->commitCb = cb;
L
Liu Jicong 已提交
1093
  conf->commitCbUserParam = param;
L
Liu Jicong 已提交
1094
}
1095

L
Liu Jicong 已提交
1096
#if 0
L
Liu Jicong 已提交
1097 1098
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
  if (tmq_message == NULL) return 0;
L
Liu Jicong 已提交
1099
  SMqPollRsp* pRsp = &tmq_message->msg;
L
Liu Jicong 已提交
1100 1101
  return pRsp->skipLogNum;
}
L
Liu Jicong 已提交
1102
#endif
L
Liu Jicong 已提交
1103 1104

int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
X
Xiaoyu Wang 已提交
1105 1106
  SMqPollCbParam* pParam = (SMqPollCbParam*)param;
  SMqClientVg*    pVg = pParam->pVg;
L
Liu Jicong 已提交
1107
  SMqClientTopic* pTopic = pParam->pTopic;
X
Xiaoyu Wang 已提交
1108
  tmq_t*          tmq = pParam->tmq;
L
Liu Jicong 已提交
1109 1110 1111
  int32_t         vgId = pParam->vgId;
  int32_t         epoch = pParam->epoch;
  taosMemoryFree(pParam);
L
Liu Jicong 已提交
1112
  if (code != 0) {
L
Liu Jicong 已提交
1113 1114
    tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code);
    if (pMsg->pData) taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127
    if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
      if (pRspWrapper == NULL) {
        taosMemoryFree(pMsg->pData);
        tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
        goto CREATE_MSG_FAIL;
      }
      pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
      /*pRspWrapper->vgHandle = pVg;*/
      /*pRspWrapper->topicHandle = pTopic;*/
      taosWriteQitem(tmq->mqueue, pRspWrapper);
      tsem_post(&tmq->rspSem);
    }
L
fix txn  
Liu Jicong 已提交
1128
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1129 1130
  }

X
Xiaoyu Wang 已提交
1131 1132 1133
  int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
  int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
  if (msgEpoch < tmqEpoch) {
L
Liu Jicong 已提交
1134
    // do not write into queue since updating epoch reset
L
Liu Jicong 已提交
1135
    tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", vgId, msgEpoch,
L
Liu Jicong 已提交
1136
            tmqEpoch);
1137
    tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1138
    taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
1139 1140 1141 1142
    return 0;
  }

  if (msgEpoch != tmqEpoch) {
L
Liu Jicong 已提交
1143
    tscWarn("mismatch rsp from vg %d, epoch %d, current epoch %d", vgId, msgEpoch, tmqEpoch);
X
Xiaoyu Wang 已提交
1144 1145
  }

L
Liu Jicong 已提交
1146 1147 1148 1149 1150
  // handle meta rsp
  int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
  if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
  }

1151
  SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1152
  if (pRspWrapper == NULL) {
L
Liu Jicong 已提交
1153 1154
    taosMemoryFree(pMsg->pData);
    tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch);
L
fix txn  
Liu Jicong 已提交
1155
    goto CREATE_MSG_FAIL;
L
Liu Jicong 已提交
1156
  }
L
Liu Jicong 已提交
1157

L
Liu Jicong 已提交
1158
  pRspWrapper->tmqRspType = rspType;
L
Liu Jicong 已提交
1159 1160
  pRspWrapper->vgHandle = pVg;
  pRspWrapper->topicHandle = pTopic;
L
Liu Jicong 已提交
1161

L
Liu Jicong 已提交
1162 1163 1164 1165 1166 1167 1168 1169
  memcpy(&pRspWrapper->dataRsp, pMsg->pData, sizeof(SMqRspHead));

  if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
    tDecodeSMqDataBlkRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->dataRsp);
  } else {
    ASSERT(rspType == TMQ_MSG_TYPE__POLL_META_RSP);
    tDecodeSMqMetaRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->metaRsp);
  }
L
Liu Jicong 已提交
1170

L
Liu Jicong 已提交
1171
  taosMemoryFree(pMsg->pData);
L
Liu Jicong 已提交
1172

L
Liu Jicong 已提交
1173
  tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId,
L
Liu Jicong 已提交
1174
           pRspWrapper->dataRsp.reqOffset, pRspWrapper->dataRsp.rspOffset);
L
fix  
Liu Jicong 已提交
1175

L
Liu Jicong 已提交
1176
  taosWriteQitem(tmq->mqueue, pRspWrapper);
1177
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1178

L
Liu Jicong 已提交
1179
  return 0;
L
fix txn  
Liu Jicong 已提交
1180
CREATE_MSG_FAIL:
L
Liu Jicong 已提交
1181
  if (epoch == tmq->epoch) {
L
Liu Jicong 已提交
1182 1183
    atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
  }
1184
  tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1185
  return -1;
1186 1187
}

1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215
bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
  bool set = false;

  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);

  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }

  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }
  int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < topicNumCur; i++) {
    // find old topic
    SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
    if (pTopicCur->vgs) {
      int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
      tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
      for (int32_t j = 0; j < vgNumCur; j++) {
        SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
        sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1216 1217
        tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %ld", tmq->consumerId, epoch, pVgCur->vgId, vgKey,
                 pVgCur->currentOffset);
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242
        taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
      }
    }
  }

  for (int32_t i = 0; i < topicNumGet; i++) {
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
    topic.schema = pTopicEp->schema;
    topic.topicName = strdup(pTopicEp->topic);
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);

    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = tmq->resetOffsetCfg;
      if (pOffset != NULL) {
        offset = *pOffset;
      }

L
Liu Jicong 已提交
1243 1244
      tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch,
               pVgEp->vgId, offset, vgKey);
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270
      SMqClientVg clientVg = {
          .pollCnt = 0,
          .currentOffset = offset,
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
          .vgSkipCnt = 0,
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
    taosArrayPush(newTopics, &topic);
  }
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
  taosHashCleanup(pHash);
  tmq->clientTopics = newTopics;

  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);

  atomic_store_32(&tmq->epoch, epoch);
  return set;
}

L
Liu Jicong 已提交
1271
#if 1
L
Liu Jicong 已提交
1272
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
L
Liu Jicong 已提交
1273
  /*printf("call update ep %d\n", epoch);*/
X
Xiaoyu Wang 已提交
1274
  bool    set = false;
L
Liu Jicong 已提交
1275 1276
  int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
  char    vgKey[TSDB_TOPIC_FNAME_LEN + 22];
L
Liu Jicong 已提交
1277 1278
  tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
           topicNumGet);
L
Liu Jicong 已提交
1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
  SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
  if (newTopics == NULL) {
    return false;
  }
  SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
  if (pHash == NULL) {
    taosArrayDestroy(newTopics);
    return false;
  }

  // find topic, build hash
  for (int32_t i = 0; i < topicNumGet; i++) {
X
Xiaoyu Wang 已提交
1291 1292
    SMqClientTopic topic = {0};
    SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
L
Liu Jicong 已提交
1293
    topic.schema = pTopicEp->schema;
L
Liu Jicong 已提交
1294
    taosHashClear(pHash);
X
Xiaoyu Wang 已提交
1295
    topic.topicName = strdup(pTopicEp->topic);
L
Liu Jicong 已提交
1296
    tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1297

L
Liu Jicong 已提交
1298
    tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName);
L
Liu Jicong 已提交
1299 1300 1301 1302 1303 1304
    int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
    for (int32_t j = 0; j < topicNumCur; j++) {
      // find old topic
      SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j);
      if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) {
        int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
L
Liu Jicong 已提交
1305
        tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur);
L
Liu Jicong 已提交
1306 1307 1308 1309
        if (vgNumCur == 0) break;
        for (int32_t k = 0; k < vgNumCur; k++) {
          SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
          sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
L
Liu Jicong 已提交
1310
          tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
L
Liu Jicong 已提交
1311 1312 1313 1314 1315 1316 1317 1318 1319
          taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
        }
        break;
      }
    }

    int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
    topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
    for (int32_t j = 0; j < vgNumGet; j++) {
X
Xiaoyu Wang 已提交
1320
      SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
L
Liu Jicong 已提交
1321 1322 1323
      sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
      int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
      int64_t  offset = pVgEp->offset;
1324
      tscDebug("consumer %ld(epoch %d) original offset of vg %d is %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
L
Liu Jicong 已提交
1325 1326
      if (pOffset != NULL) {
        offset = *pOffset;
1327 1328
        tscDebug("consumer %ld(epoch %d) receive offset of vg %d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId,
                 vgKey);
L
Liu Jicong 已提交
1329
      }
1330
      tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset);
X
Xiaoyu Wang 已提交
1331 1332
      SMqClientVg clientVg = {
          .pollCnt = 0,
L
Liu Jicong 已提交
1333
          .currentOffset = offset,
X
Xiaoyu Wang 已提交
1334 1335 1336
          .vgId = pVgEp->vgId,
          .epSet = pVgEp->epSet,
          .vgStatus = TMQ_VG_STATUS__IDLE,
L
Liu Jicong 已提交
1337
          .vgSkipCnt = 0,
X
Xiaoyu Wang 已提交
1338 1339 1340 1341
      };
      taosArrayPush(topic.vgs, &clientVg);
      set = true;
    }
L
Liu Jicong 已提交
1342
    taosArrayPush(newTopics, &topic);
X
Xiaoyu Wang 已提交
1343
  }
L
Liu Jicong 已提交
1344
  if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
L
Liu Jicong 已提交
1345
  taosHashCleanup(pHash);
L
Liu Jicong 已提交
1346
  tmq->clientTopics = newTopics;
1347

1348 1349 1350 1351
  if (taosArrayGetSize(tmq->clientTopics) == 0)
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);
  else
    atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
1352

X
Xiaoyu Wang 已提交
1353 1354 1355
  atomic_store_32(&tmq->epoch, epoch);
  return set;
}
L
Liu Jicong 已提交
1356
#endif
X
Xiaoyu Wang 已提交
1357

L
Liu Jicong 已提交
1358
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
1359
  SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
L
Liu Jicong 已提交
1360
  tmq_t*           tmq = pParam->tmq;
L
Liu Jicong 已提交
1361
  int8_t           async = pParam->async;
L
Liu Jicong 已提交
1362
  pParam->code = code;
1363
  if (code != 0) {
L
Liu Jicong 已提交
1364
    tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async);
L
Liu Jicong 已提交
1365
    goto END;
1366
  }
L
Liu Jicong 已提交
1367

L
Liu Jicong 已提交
1368
  // tmq's epoch is monotonically increase,
L
Liu Jicong 已提交
1369
  // so it's safe to discard any old epoch msg.
L
Liu Jicong 已提交
1370
  // Epoch will only increase when received newer epoch ep msg
L
Liu Jicong 已提交
1371 1372
  SMqRspHead* head = pMsg->pData;
  int32_t     epoch = atomic_load_32(&tmq->epoch);
L
temp  
Liu Jicong 已提交
1373
  tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
L
Liu Jicong 已提交
1374 1375
  if (head->epoch <= epoch) {
    goto END;
1376
  }
L
Liu Jicong 已提交
1377

L
Liu Jicong 已提交
1378
  if (!async) {
L
Liu Jicong 已提交
1379 1380
    SMqAskEpRsp rsp;
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
X
Xiaoyu Wang 已提交
1381 1382
    /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/
    /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/
L
Liu Jicong 已提交
1383
    tmqUpdateEp2(tmq, head->epoch, &rsp);
L
Liu Jicong 已提交
1384
    tDeleteSMqAskEpRsp(&rsp);
X
Xiaoyu Wang 已提交
1385
  } else {
1386
    SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM);
L
Liu Jicong 已提交
1387
    if (pWrapper == NULL) {
X
Xiaoyu Wang 已提交
1388
      terrno = TSDB_CODE_OUT_OF_MEMORY;
L
Liu Jicong 已提交
1389 1390
      code = -1;
      goto END;
X
Xiaoyu Wang 已提交
1391
    }
L
Liu Jicong 已提交
1392 1393 1394
    pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
    pWrapper->epoch = head->epoch;
    memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
L
Liu Jicong 已提交
1395
    tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
L
Liu Jicong 已提交
1396

L
Liu Jicong 已提交
1397
    taosWriteQitem(tmq->mqueue, pWrapper);
1398
    tsem_post(&tmq->rspSem);
1399
  }
L
Liu Jicong 已提交
1400 1401

END:
L
Liu Jicong 已提交
1402
  /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1403
  if (!async) {
L
Liu Jicong 已提交
1404
    tsem_post(&pParam->rspSem);
L
Liu Jicong 已提交
1405 1406
  } else {
    taosMemoryFree(pParam);
L
Liu Jicong 已提交
1407 1408
  }
  return code;
1409 1410
}

L
Liu Jicong 已提交
1411
int32_t tmqAskEp(tmq_t* tmq, bool async) {
L
Liu Jicong 已提交
1412
  int32_t code = 0;
L
Liu Jicong 已提交
1413
#if 0
L
Liu Jicong 已提交
1414
  int8_t  epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
L
fix txn  
Liu Jicong 已提交
1415
  if (epStatus == 1) {
L
temp  
Liu Jicong 已提交
1416
    int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
L
Liu Jicong 已提交
1417
    tscTrace("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
L
Liu Jicong 已提交
1418
    if (epSkipCnt < 5000) return 0;
L
fix txn  
Liu Jicong 已提交
1419
  }
L
temp  
Liu Jicong 已提交
1420
  atomic_store_32(&tmq->epSkipCnt, 0);
L
Liu Jicong 已提交
1421
#endif
L
Liu Jicong 已提交
1422
  int32_t      tlen = sizeof(SMqAskEpReq);
L
Liu Jicong 已提交
1423
  SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
L
Liu Jicong 已提交
1424
  if (req == NULL) {
L
Liu Jicong 已提交
1425
    tscError("failed to malloc get subscribe ep buf");
L
Liu Jicong 已提交
1426
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1427
    return -1;
L
Liu Jicong 已提交
1428
  }
L
Liu Jicong 已提交
1429 1430 1431
  req->consumerId = htobe64(tmq->consumerId);
  req->epoch = htonl(tmq->epoch);
  strcpy(req->cgroup, tmq->groupId);
1432

L
Liu Jicong 已提交
1433
  SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
L
Liu Jicong 已提交
1434 1435
  if (pParam == NULL) {
    tscError("failed to malloc subscribe param");
wafwerar's avatar
wafwerar 已提交
1436
    taosMemoryFree(req);
L
Liu Jicong 已提交
1437
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1438
    return -1;
L
Liu Jicong 已提交
1439 1440
  }
  pParam->tmq = tmq;
L
Liu Jicong 已提交
1441
  pParam->async = async;
X
Xiaoyu Wang 已提交
1442
  tsem_init(&pParam->rspSem, 0, 0);
1443

1444
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1445 1446
  if (sendInfo == NULL) {
    tsem_destroy(&pParam->rspSem);
wafwerar's avatar
wafwerar 已提交
1447 1448
    taosMemoryFree(pParam);
    taosMemoryFree(req);
L
Liu Jicong 已提交
1449
    /*atomic_store_8(&tmq->epStatus, 0);*/
L
Liu Jicong 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459
    return -1;
  }

  sendInfo->msgInfo = (SDataBuf){
      .pData = req,
      .len = tlen,
      .handle = NULL,
  };

  sendInfo->requestId = generateRequestId();
L
Liu Jicong 已提交
1460 1461 1462
  sendInfo->requestObjRefId = 0;
  sendInfo->param = pParam;
  sendInfo->fp = tmqAskEpCb;
L
Liu Jicong 已提交
1463
  sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
1464

L
Liu Jicong 已提交
1465 1466
  SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);

L
add log  
Liu Jicong 已提交
1467 1468
  tscDebug("consumer %ld ask ep", tmq->consumerId);

L
Liu Jicong 已提交
1469 1470
  int64_t transporterId = 0;
  asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
1471

L
Liu Jicong 已提交
1472
  if (!async) {
L
Liu Jicong 已提交
1473 1474 1475 1476 1477
    tsem_wait(&pParam->rspSem);
    code = pParam->code;
    taosMemoryFree(pParam);
  }
  return code;
1478 1479
}

L
Liu Jicong 已提交
1480 1481
#if 0
int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
L
Liu Jicong 已提交
1482 1483 1484 1485 1486 1487 1488 1489 1490 1491 1492 1493 1494
  const SMqOffset* pOffset = &offset->offset;
  if (strcmp(pOffset->cgroup, tmq->groupId) != 0) {
    return TMQ_RESP_ERR__FAIL;
  }
  int32_t sz = taosArrayGetSize(tmq->clientTopics);
  for (int32_t i = 0; i < sz; i++) {
    SMqClientTopic* clientTopic = taosArrayGet(tmq->clientTopics, i);
    if (strcmp(clientTopic->topicName, pOffset->topicName) == 0) {
      int32_t vgSz = taosArrayGetSize(clientTopic->vgs);
      for (int32_t j = 0; j < vgSz; j++) {
        SMqClientVg* pVg = taosArrayGet(clientTopic->vgs, j);
        if (pVg->vgId == pOffset->vgId) {
          pVg->currentOffset = pOffset->offset;
L
Liu Jicong 已提交
1495
          tmqClearUnhandleMsg(tmq);
L
Liu Jicong 已提交
1496 1497 1498 1499 1500 1501 1502
          return TMQ_RESP_ERR__SUCCESS;
        }
      }
    }
  }
  return TMQ_RESP_ERR__FAIL;
}
L
Liu Jicong 已提交
1503
#endif
L
Liu Jicong 已提交
1504

1505
SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
L
Liu Jicong 已提交
1506 1507 1508 1509
  int64_t reqOffset;
  if (pVg->currentOffset >= 0) {
    reqOffset = pVg->currentOffset;
  } else {
L
Liu Jicong 已提交
1510 1511 1512 1513
    /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/
    /*tscError("unable to poll since no committed offset but reset offset is set to none");*/
    /*return NULL;*/
    /*}*/
L
Liu Jicong 已提交
1514 1515 1516
    reqOffset = tmq->resetOffsetCfg;
  }

L
Liu Jicong 已提交
1517
  SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
1518 1519 1520
  if (pReq == NULL) {
    return NULL;
  }
L
Liu Jicong 已提交
1521

L
Liu Jicong 已提交
1522 1523 1524
  /*strcpy(pReq->topic, pTopic->topicName);*/
  /*strcpy(pReq->cgroup, tmq->groupId);*/

L
Liu Jicong 已提交
1525 1526 1527 1528
  int32_t groupLen = strlen(tmq->groupId);
  memcpy(pReq->subKey, tmq->groupId, groupLen);
  pReq->subKey[groupLen] = TMQ_SEPARATOR;
  strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
1529

1530
  pReq->withTbName = tmq->withTbName;
1531
  pReq->timeout = timeout;
L
Liu Jicong 已提交
1532
  pReq->consumerId = tmq->consumerId;
X
Xiaoyu Wang 已提交
1533
  pReq->epoch = tmq->epoch;
L
Liu Jicong 已提交
1534
  pReq->currentOffset = reqOffset;
L
Liu Jicong 已提交
1535
  pReq->reqId = generateRequestId();
1536 1537

  pReq->head.vgId = htonl(pVg->vgId);
L
Liu Jicong 已提交
1538
  pReq->head.contLen = htonl(sizeof(SMqPollReq));
1539 1540 1541
  return pReq;
}

L
Liu Jicong 已提交
1542 1543 1544 1545 1546 1547 1548 1549 1550 1551 1552
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqMetaRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqMetaRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
  pRspObj->vgId = pWrapper->vgHandle->vgId;

  memcpy(&pRspObj->metaRsp, &pWrapper->metaRsp, sizeof(SMqMetaRsp));
  return pRspObj;
}

L
Liu Jicong 已提交
1553 1554 1555
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
  SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
  pRspObj->resType = RES_TYPE__TMQ;
L
Liu Jicong 已提交
1556 1557
  tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
  tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
L
Liu Jicong 已提交
1558
  pRspObj->vgId = pWrapper->vgHandle->vgId;
L
Liu Jicong 已提交
1559
  pRspObj->resIter = -1;
L
Liu Jicong 已提交
1560
  memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataBlkRsp));
L
Liu Jicong 已提交
1561

L
Liu Jicong 已提交
1562 1563
  pRspObj->resInfo.totalRows = 0;
  pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
L
Liu Jicong 已提交
1564
  if (!pWrapper->dataRsp.withSchema) {
L
Liu Jicong 已提交
1565 1566
    setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
  }
L
Liu Jicong 已提交
1567

L
Liu Jicong 已提交
1568
  return pRspObj;
X
Xiaoyu Wang 已提交
1569 1570
}

1571
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
L
fix  
Liu Jicong 已提交
1572
  /*printf("call poll\n");*/
X
Xiaoyu Wang 已提交
1573 1574 1575 1576 1577 1578
  for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
    SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
    for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
      SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
      int32_t      vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
      if (vgStatus != TMQ_VG_STATUS__IDLE) {
L
Liu Jicong 已提交
1579
        int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
L
Liu Jicong 已提交
1580
        tscTrace("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt);
X
Xiaoyu Wang 已提交
1581
        continue;
L
Liu Jicong 已提交
1582
        /*if (vgSkipCnt < 10000) continue;*/
L
temp  
Liu Jicong 已提交
1583 1584 1585 1586 1587 1588 1589
#if 0
        if (skipCnt < 30000) {
          continue;
        } else {
        tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
        }
#endif
X
Xiaoyu Wang 已提交
1590
      }
L
Liu Jicong 已提交
1591
      atomic_store_32(&pVg->vgSkipCnt, 0);
1592
      SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, timeout, pTopic, pVg);
X
Xiaoyu Wang 已提交
1593 1594
      if (pReq == NULL) {
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1595
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1596 1597
        return -1;
      }
wafwerar's avatar
wafwerar 已提交
1598
      SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
L
Liu Jicong 已提交
1599
      if (pParam == NULL) {
wafwerar's avatar
wafwerar 已提交
1600
        taosMemoryFree(pReq);
X
Xiaoyu Wang 已提交
1601
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1602
        tsem_post(&tmq->rspSem);
X
Xiaoyu Wang 已提交
1603 1604
        return -1;
      }
L
Liu Jicong 已提交
1605 1606
      pParam->tmq = tmq;
      pParam->pVg = pVg;
L
Liu Jicong 已提交
1607
      pParam->pTopic = pTopic;
L
Liu Jicong 已提交
1608
      pParam->vgId = pVg->vgId;
L
Liu Jicong 已提交
1609 1610
      pParam->epoch = tmq->epoch;

1611
      SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
L
Liu Jicong 已提交
1612
      if (sendInfo == NULL) {
wafwerar's avatar
wafwerar 已提交
1613 1614
        taosMemoryFree(pReq);
        taosMemoryFree(pParam);
L
Liu Jicong 已提交
1615
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
1616
        tsem_post(&tmq->rspSem);
L
Liu Jicong 已提交
1617 1618 1619 1620
        return -1;
      }

      sendInfo->msgInfo = (SDataBuf){
X
Xiaoyu Wang 已提交
1621
          .pData = pReq,
L
Liu Jicong 已提交
1622
          .len = sizeof(SMqPollReq),
X
Xiaoyu Wang 已提交
1623 1624
          .handle = NULL,
      };
L
Liu Jicong 已提交
1625
      sendInfo->requestId = pReq->reqId;
X
Xiaoyu Wang 已提交
1626
      sendInfo->requestObjRefId = 0;
L
Liu Jicong 已提交
1627
      sendInfo->param = pParam;
X
Xiaoyu Wang 已提交
1628
      sendInfo->fp = tmqPollCb;
L
Liu Jicong 已提交
1629
      sendInfo->msgType = TDMT_VND_CONSUME;
X
Xiaoyu Wang 已提交
1630 1631

      int64_t transporterId = 0;
L
fix  
Liu Jicong 已提交
1632
      /*printf("send poll\n");*/
L
Liu Jicong 已提交
1633 1634
      tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
               pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
L
fix  
Liu Jicong 已提交
1635
      /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
X
Xiaoyu Wang 已提交
1636 1637 1638 1639 1640 1641 1642 1643
      asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
      pVg->pollCnt++;
      tmq->pollCnt++;
    }
  }
  return 0;
}

L
Liu Jicong 已提交
1644 1645
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
  if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
L
fix  
Liu Jicong 已提交
1646
    /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
L
Liu Jicong 已提交
1647 1648
    if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
      SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1649
      SMqAskEpRsp*        rspMsg = &pEpRspWrapper->msg;
L
Liu Jicong 已提交
1650
      tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg);
L
temp  
Liu Jicong 已提交
1651
      /*tmqClearUnhandleMsg(tmq);*/
X
Xiaoyu Wang 已提交
1652 1653 1654 1655 1656 1657 1658 1659 1660 1661
      *pReset = true;
    } else {
      *pReset = false;
    }
  } else {
    return -1;
  }
  return 0;
}

1662
SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
X
Xiaoyu Wang 已提交
1663
  while (1) {
L
Liu Jicong 已提交
1664 1665 1666
    SMqRspWrapper* rspWrapper = NULL;
    taosGetQitem(tmq->qall, (void**)&rspWrapper);
    if (rspWrapper == NULL) {
L
Liu Jicong 已提交
1667
      taosReadAllQitems(tmq->mqueue, tmq->qall);
L
Liu Jicong 已提交
1668 1669
      taosGetQitem(tmq->qall, (void**)&rspWrapper);
      if (rspWrapper == NULL) return NULL;
X
Xiaoyu Wang 已提交
1670 1671
    }

L
Liu Jicong 已提交
1672 1673 1674 1675 1676
    if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) {
      taosFreeQitem(rspWrapper);
      terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
      return NULL;
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
L
Liu Jicong 已提交
1677
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
L
Liu Jicong 已提交
1678
      /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
1679
      int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
L
Liu Jicong 已提交
1680
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
L
Liu Jicong 已提交
1681
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
L
Liu Jicong 已提交
1682
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
L
Liu Jicong 已提交
1683
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
X
Xiaoyu Wang 已提交
1684
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
L
Liu Jicong 已提交
1685
        if (pollRspWrapper->dataRsp.blockNum == 0) {
L
Liu Jicong 已提交
1686 1687
          taosFreeQitem(pollRspWrapper);
          rspWrapper = NULL;
L
temp  
Liu Jicong 已提交
1688 1689
          continue;
        }
L
Liu Jicong 已提交
1690
        // build rsp
L
Liu Jicong 已提交
1691
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
L
Liu Jicong 已提交
1692
        taosFreeQitem(pollRspWrapper);
L
Liu Jicong 已提交
1693
        return pRsp;
X
Xiaoyu Wang 已提交
1694
      } else {
L
Liu Jicong 已提交
1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
        taosFreeQitem(pollRspWrapper);
      }
    } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
      SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper;
      int32_t            consumerEpoch = atomic_load_32(&tmq->epoch);
      if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
        SMqClientVg* pVg = pollRspWrapper->vgHandle;
        /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
        pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset;
        atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
        // build rsp
        SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper);
        taosFreeQitem(pollRspWrapper);
        return pRsp;
      } else {
        tscDebug("msg discard since epoch mismatch: msg epoch %d, consumer epoch %d\n",
                 pollRspWrapper->dataRsp.head.epoch, consumerEpoch);
L
Liu Jicong 已提交
1714
        taosFreeQitem(pollRspWrapper);
X
Xiaoyu Wang 已提交
1715 1716
      }
    } else {
L
fix  
Liu Jicong 已提交
1717
      /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
X
Xiaoyu Wang 已提交
1718
      bool reset = false;
L
Liu Jicong 已提交
1719 1720
      tmqHandleNoPollRsp(tmq, rspWrapper, &reset);
      taosFreeQitem(rspWrapper);
X
Xiaoyu Wang 已提交
1721
      if (pollIfReset && reset) {
L
add log  
Liu Jicong 已提交
1722
        tscDebug("consumer %ld reset and repoll", tmq->consumerId);
1723
        tmqPollImpl(tmq, timeout);
X
Xiaoyu Wang 已提交
1724 1725 1726 1727 1728
      }
    }
  }
}

1729
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
L
Liu Jicong 已提交
1730 1731
  SMqRspObj* rspObj;
  int64_t    startTime = taosGetTimestampMs();
L
Liu Jicong 已提交
1732

1733 1734 1735
#if 0
  tmqHandleAllDelayedTask(tmq);
  tmqPollImpl(tmq, timeout);
1736
  rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1737 1738
  if (rspObj) {
    return (TAOS_RES*)rspObj;
L
fix  
Liu Jicong 已提交
1739
  }
1740
#endif
X
Xiaoyu Wang 已提交
1741

L
Liu Jicong 已提交
1742
  // in no topic status also need process delayed task
L
Liu Jicong 已提交
1743
  if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
1744 1745 1746
    return NULL;
  }

X
Xiaoyu Wang 已提交
1747
  while (1) {
L
Liu Jicong 已提交
1748
    tmqHandleAllDelayedTask(tmq);
1749
    if (tmqPollImpl(tmq, timeout) < 0) return NULL;
L
Liu Jicong 已提交
1750

1751
    rspObj = tmqHandleAllRsp(tmq, timeout, false);
L
Liu Jicong 已提交
1752 1753
    if (rspObj) {
      return (TAOS_RES*)rspObj;
L
Liu Jicong 已提交
1754 1755
    } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
      return NULL;
X
Xiaoyu Wang 已提交
1756
    }
1757
    if (timeout != -1) {
X
Xiaoyu Wang 已提交
1758
      int64_t endTime = taosGetTimestampMs();
1759
      int64_t leftTime = endTime - startTime;
1760
      if (leftTime > timeout) {
L
Liu Jicong 已提交
1761
        tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch);
X
Xiaoyu Wang 已提交
1762 1763
        return NULL;
      }
1764
      tsem_timewait(&tmq->rspSem, leftTime * 1000);
L
Liu Jicong 已提交
1765 1766 1767
    } else {
      // use tsem_timewait instead of tsem_wait to avoid unexpected stuck
      tsem_timewait(&tmq->rspSem, 500 * 1000);
X
Xiaoyu Wang 已提交
1768 1769 1770 1771
    }
  }
}

L
Liu Jicong 已提交
1772
int32_t tmq_consumer_close(tmq_t* tmq) {
1773
  if (tmq->status == TMQ_CONSUMER_STATUS__READY) {
L
Liu Jicong 已提交
1774 1775
    int32_t rsp = tmq_commit_sync(tmq, NULL);
    if (rsp != 0) {
L
Liu Jicong 已提交
1776
      return rsp;
1777 1778 1779 1780
    }

    tmq_list_t* lst = tmq_list_new();
    rsp = tmq_subscribe(tmq, lst);
1781
    tmq_list_destroy(lst);
1782

L
Liu Jicong 已提交
1783
    if (rsp != 0) {
L
Liu Jicong 已提交
1784
      return rsp;
1785
    }
L
Liu Jicong 已提交
1786
  }
1787
  // TODO: free resources
L
Liu Jicong 已提交
1788
  return 0;
1789
}
L
Liu Jicong 已提交
1790

L
Liu Jicong 已提交
1791 1792
const char* tmq_err2str(int32_t err) {
  if (err == 0) {
L
Liu Jicong 已提交
1793
    return "success";
L
Liu Jicong 已提交
1794
  } else if (err == -1) {
L
Liu Jicong 已提交
1795 1796 1797
    return "fail";
  } else {
    return tstrerror(err);
L
Liu Jicong 已提交
1798 1799
  }
}
L
Liu Jicong 已提交
1800

L
Liu Jicong 已提交
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810
tmq_res_t tmq_get_res_type(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    return TMQ_RES_DATA;
  } else if (TD_RES_TMQ_META(res)) {
    return TMQ_RES_TABLE_META;
  } else {
    return TMQ_RES_INVALID;
  }
}

L
Liu Jicong 已提交
1811
const char* tmq_get_topic_name(TAOS_RES* res) {
L
Liu Jicong 已提交
1812 1813
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
L
Liu Jicong 已提交
1814
    return strchr(pRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1815 1816 1817
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->topic, '.') + 1;
L
Liu Jicong 已提交
1818 1819 1820 1821 1822
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1823 1824 1825 1826
const char* tmq_get_db_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return strchr(pRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1827 1828 1829
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return strchr(pMetaRspObj->db, '.') + 1;
L
Liu Jicong 已提交
1830 1831 1832 1833 1834
  } else {
    return NULL;
  }
}

L
Liu Jicong 已提交
1835 1836 1837 1838
int32_t tmq_get_vgroup_id(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    return pRspObj->vgId;
L
Liu Jicong 已提交
1839 1840 1841
  } else if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    return pMetaRspObj->vgId;
L
Liu Jicong 已提交
1842 1843 1844 1845
  } else {
    return -1;
  }
}
L
Liu Jicong 已提交
1846 1847 1848 1849 1850 1851 1852 1853

const char* tmq_get_table_name(TAOS_RES* res) {
  if (TD_RES_TMQ(res)) {
    SMqRspObj* pRspObj = (SMqRspObj*)res;
    if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
        pRspObj->resIter >= pRspObj->rsp.blockNum) {
      return NULL;
    }
1854
    return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
L
Liu Jicong 已提交
1855 1856 1857
  }
  return NULL;
}
1858

L
Liu Jicong 已提交
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868
int32_t tmq_get_raw_meta(TAOS_RES* res, const void** raw_meta, int32_t* raw_meta_len) {
  if (TD_RES_TMQ_META(res)) {
    SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
    *raw_meta = pMetaRspObj->metaRsp.metaRsp;
    *raw_meta_len = pMetaRspObj->metaRsp.metaRspLen;
    return 0;
  }
  return -1;
}

L
Liu Jicong 已提交
1869 1870
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
  tmqCommitInner2(tmq, msg, 0, 1, cb, param);
L
Liu Jicong 已提交
1871 1872
}

L
Liu Jicong 已提交
1873
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); }