mndDef.c 20.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndDef.h"
18 19
#include "mndConsumer.h"

L
Liu Jicong 已提交
20
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
L
Liu Jicong 已提交
21
  if (tStartEncode(pEncoder) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24 25 26
  if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
  if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
L
Liu Jicong 已提交
27
  if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
28 29 30
  if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;

  if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
31
  if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
L
Liu Jicong 已提交
32

33
  if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
L
Liu Jicong 已提交
34
  if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
L
Liu Jicong 已提交
35
  if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
36 37
  if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
L
Liu Jicong 已提交
38 39 40 41 42 43 44

  if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
45
  if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
46

S
Shengliang Guan 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
  if (pObj->sql != NULL) {
    if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }

  if (pObj->ast != NULL) {
    if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }

  if (pObj->physicalPlan != NULL) {
    if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
  int32_t sz = taosArrayGetSize(pObj->tasks);
  if (tEncodeI32(pEncoder, sz) < 0) return -1;
L
Liu Jicong 已提交
67 68 69 70 71 72 73 74 75 76 77
  for (int32_t i = 0; i < sz; i++) {
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
    int32_t innerSz = taosArrayGetSize(pArray);
    if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
    for (int32_t j = 0; j < innerSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pArray, j);
      if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
    }
  }

  if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
78

79 80 81
  // 3.0.20
  if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;

L
Liu Jicong 已提交
82
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
83 84 85
  return pEncoder->pos;
}

86
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
L
Liu Jicong 已提交
87
  if (tStartDecode(pDecoder) < 0) return -1;
L
Liu Jicong 已提交
88
  if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90 91 92
  if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
  if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
L
Liu Jicong 已提交
93
  if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
94 95 96
  if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;

  if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
97
  if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
L
Liu Jicong 已提交
98

99
  if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
L
Liu Jicong 已提交
100
  if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
L
Liu Jicong 已提交
101
  if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
102 103
  if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
L
Liu Jicong 已提交
104 105 106 107 108 109 110

  if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
111
  if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
112

L
Liu Jicong 已提交
113
  if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
114
  if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
L
Liu Jicong 已提交
115
  if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
L
Liu Jicong 已提交
116

L
Liu Jicong 已提交
117 118 119 120 121 122 123 124 125 126 127
  pObj->tasks = NULL;
  int32_t sz;
  if (tDecodeI32(pDecoder, &sz) < 0) return -1;
  if (sz != 0) {
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
    for (int32_t i = 0; i < sz; i++) {
      int32_t innerSz;
      if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
      for (int32_t j = 0; j < innerSz; j++) {
        SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
128 129 130 131
        if (pTask == NULL) {
          taosArrayDestroy(pArray);
          return -1;
        }
L
Liu Jicong 已提交
132 133
        if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
          taosMemoryFree(pTask);
134
          taosArrayDestroy(pArray);
L
Liu Jicong 已提交
135 136
          return -1;
        }
L
Liu Jicong 已提交
137 138 139 140 141 142 143
        taosArrayPush(pArray, &pTask);
      }
      taosArrayPush(pObj->tasks, &pArray);
    }
  }

  if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
144

145
  // 3.0.20
146 147 148
  if (sver >= 2) {
    if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
  }
L
Liu Jicong 已提交
149
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
150 151 152
  return 0;
}

L
Liu Jicong 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
void tFreeStreamObj(SStreamObj *pStream) {
  taosMemoryFree(pStream->sql);
  taosMemoryFree(pStream->ast);
  taosMemoryFree(pStream->physicalPlan);
  if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);

  int32_t sz = taosArrayGetSize(pStream->tasks);
  for (int32_t i = 0; i < sz; i++) {
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
    int32_t taskSz = taosArrayGetSize(pLevel);
    for (int32_t j = 0; j < taskSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
      tFreeSStreamTask(pTask);
    }
    taosArrayDestroy(pLevel);
  }
  taosArrayDestroy(pStream->tasks);
}

L
Liu Jicong 已提交
172 173 174 175 176 177 178 179 180 181
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
  if (pVgEpNew == NULL) return NULL;
  pVgEpNew->vgId = pVgEp->vgId;
  pVgEpNew->qmsg = strdup(pVgEp->qmsg);
  pVgEpNew->epSet = pVgEp->epSet;
  return pVgEpNew;
}

void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
L
Liu Jicong 已提交
182 183 184 185
  if (pVgEp) {
    taosMemoryFreeClear(pVgEp->qmsg);
    taosMemoryFree(pVgEp);
  }
L
Liu Jicong 已提交
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
}

int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
  tlen += taosEncodeString(buf, pVgEp->qmsg);
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
  buf = taosDecodeString(buf, &pVgEp->qmsg);
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return (void *)buf;
}

203 204 205
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
  if (pConsumer == NULL) {
S
Shengliang Guan 已提交
206
    terrno = TSDB_CODE_OUT_OF_MEMORY;
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
    return NULL;
  }

  pConsumer->consumerId = consumerId;
  memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);

  pConsumer->epoch = 0;
  pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
  pConsumer->hbStatus = 0;

  taosInitRWLatch(&pConsumer->lock);

  pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
L
Liu Jicong 已提交
222
  pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *));
223

L
Liu Jicong 已提交
224 225
  if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
      pConsumer->assignedTopics == NULL) {
226 227 228
    taosArrayDestroy(pConsumer->currentTopics);
    taosArrayDestroy(pConsumer->rebNewTopics);
    taosArrayDestroy(pConsumer->rebRemovedTopics);
L
Liu Jicong 已提交
229
    taosArrayDestroy(pConsumer->assignedTopics);
230 231 232 233
    taosMemoryFree(pConsumer);
    return NULL;
  }

L
Liu Jicong 已提交
234 235
  pConsumer->upTime = taosGetTimestampMs();

236 237 238 239
  return pConsumer;
}

void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
240 241 242 243
  taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
244 245 246 247 248 249
}

int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
  int32_t tlen = 0;
  int32_t sz;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
250
  tlen += taosEncodeString(buf, pConsumer->clientId);
251 252 253 254 255
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
  tlen += taosEncodeFixedI32(buf, pConsumer->status);

L
Liu Jicong 已提交
256 257 258 259 260 261
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
  tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);

262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297
  // current topics
  if (pConsumer->currentTopics) {
    sz = taosArrayGetSize(pConsumer->currentTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb new topics
  if (pConsumer->rebNewTopics) {
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb removed topics
  if (pConsumer->rebRemovedTopics) {
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

L
Liu Jicong 已提交
298 299 300 301 302 303 304 305 306 307 308 309
  // lost topics
  if (pConsumer->assignedTopics) {
    sz = taosArrayGetSize(pConsumer->assignedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

310 311 312 313 314 315
  return tlen;
}

void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
316
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
317 318 319 320 321
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
  buf = taosDecodeFixedI32(buf, &pConsumer->status);

L
Liu Jicong 已提交
322 323 324 325 326 327
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
  buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354
  // current topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->currentTopics, &topic);
  }

  // reb new topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebNewTopics, &topic);
  }

  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebRemovedTopics, &topic);
  }

L
Liu Jicong 已提交
355 356 357 358 359 360 361 362 363
  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->assignedTopics, &topic);
  }

364 365 366
  return (void *)buf;
}

367 368 369 370
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
  SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
  if (pConsumerEpNew == NULL) return NULL;
  pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
H
Haojun Liao 已提交
371
  pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
372
  return pConsumerEpNew;
373 374
}

375
void tDeleteSMqConsumerEp(void *data) {
L
Liu Jicong 已提交
376
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
377
  taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
378 379
}

380
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
381
  int32_t tlen = 0;
382 383 384 385
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
#if 0
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
L
Liu Jicong 已提交
386 387
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
388
    SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
L
Liu Jicong 已提交
389 390
    tlen += tEncodeSMqVgEp(buf, pVgEp);
  }
391
#endif
392 393 394
  return tlen;
}

395 396
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
L
Liu Jicong 已提交
397
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
398
#if 0
L
Liu Jicong 已提交
399 400
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
401
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
L
Liu Jicong 已提交
402 403 404
  for (int32_t i = 0; i < sz; i++) {
    SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
    buf = tDecodeSMqVgEp(buf, pVgEp);
405
    taosArrayPush(pConsumerEp->vgs, &pVgEp);
L
Liu Jicong 已提交
406
  }
407
#endif
L
Liu Jicong 已提交
408

409 410 411 412
  return (void *)buf;
}

SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
L
Liu Jicong 已提交
413
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
414 415 416
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
417
  pSubNew->vgNum = 0;
418
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
419 420 421 422 423
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/

  pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));

424 425 426 427 428 429 430 431
  return pSubNew;
}

SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
432

L
Liu Jicong 已提交
433
  pSubNew->dbUid = pSub->dbUid;
L
Liu Jicong 已提交
434
  pSubNew->stbUid = pSub->stbUid;
L
Liu Jicong 已提交
435
  pSubNew->subType = pSub->subType;
L
Liu Jicong 已提交
436
  pSubNew->withMeta = pSub->withMeta;
L
Liu Jicong 已提交
437

438 439
  pSubNew->vgNum = pSub->vgNum;
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
440 441 442 443
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
  void          *pIter = NULL;
  SMqConsumerEp *pConsumerEp = NULL;
444
  while (1) {
L
Liu Jicong 已提交
445
    pIter = taosHashIterate(pSub->consumerHash, pIter);
446
    if (pIter == NULL) break;
447 448 449
    pConsumerEp = (SMqConsumerEp *)pIter;
    SMqConsumerEp newEp = {
        .consumerId = pConsumerEp->consumerId,
H
Haojun Liao 已提交
450
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
451
    };
452
    taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
453
  }
H
Haojun Liao 已提交
454
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
L
Liu Jicong 已提交
455
  memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
456 457 458 459
  return pSubNew;
}

void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
460 461 462 463 464 465 466
  void *pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
  }
467
  taosHashCleanup(pSub->consumerHash);
468
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
469 470 471 472 473
}

int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pSub->key);
L
Liu Jicong 已提交
474
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
475
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
L
Liu Jicong 已提交
476
  tlen += taosEncodeFixedI8(buf, pSub->subType);
L
Liu Jicong 已提交
477
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
L
Liu Jicong 已提交
478
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
479 480 481 482 483 484 485 486 487

  void   *pIter = NULL;
  int32_t sz = taosHashGetSize(pSub->consumerHash);
  tlen += taosEncodeFixedI32(buf, sz);

  int32_t cnt = 0;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
488 489
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
490 491 492
    cnt++;
  }
  ASSERT(cnt == sz);
493
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
L
Liu Jicong 已提交
494
  tlen += taosEncodeString(buf, pSub->dbName);
495 496 497 498 499 500
  return tlen;
}

void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
  //
  buf = taosDecodeStringTo(buf, pSub->key);
L
Liu Jicong 已提交
501
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
502
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
L
Liu Jicong 已提交
503
  buf = taosDecodeFixedI8(buf, &pSub->subType);
L
Liu Jicong 已提交
504
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
L
Liu Jicong 已提交
505
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
506 507 508 509 510 511

  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);

  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  for (int32_t i = 0; i < sz; i++) {
512 513 514
    SMqConsumerEp consumerEp = {0};
    buf = tDecodeSMqConsumerEp(buf, &consumerEp);
    taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
515 516
  }

517
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
L
Liu Jicong 已提交
518
  buf = taosDecodeStringTo(buf, pSub->dbName);
519 520 521 522 523 524 525
  return (void *)buf;
}

SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
  SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
  if (pEntryNew == NULL) return NULL;
  pEntryNew->epoch = pEntry->epoch;
H
Haojun Liao 已提交
526
  pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
527 528 529 530
  return pEntryNew;
}

void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
531
  taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
532 533 534 535 536 537 538 539
}

int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pEntry->epoch);
  tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}
L
Liu Jicong 已提交
540

541 542 543 544 545 546 547 548 549 550
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
  buf = taosDecodeFixedI32(buf, &pEntry->epoch);
  buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}

SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
  SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
  if (pLogNew == NULL) return pLogNew;
  memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
H
Haojun Liao 已提交
551
  pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
552 553 554 555
  return pLogNew;
}

void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
556
  taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
557 558 559 560 561 562 563 564 565 566 567 568 569 570
}

int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pLog->key);
  tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}

void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
  buf = taosDecodeStringTo(buf, pLog->key);
  buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}
L
Liu Jicong 已提交
571

S
Shengliang Guan 已提交
572 573 574 575 576 577 578 579 580 581 582
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pOffset->key);
  tlen += taosEncodeFixedI64(buf, pOffset->offset);
  return tlen;
}

void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
  buf = taosDecodeStringTo(buf, pOffset->key);
  buf = taosDecodeFixedI64(buf, &pOffset->offset);
  return buf;
L
Liu Jicong 已提交
583
}