mndDef.c 19.5 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndDef.h"
18 19
#include "mndConsumer.h"

L
Liu Jicong 已提交
20 21
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
  if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
22

L
Liu Jicong 已提交
23 24 25
  if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
  if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
L
Liu Jicong 已提交
26
  if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
27 28 29
  if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;

  if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
30
  if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
L
Liu Jicong 已提交
31

32
  if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
L
Liu Jicong 已提交
33
  if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
34 35
  if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
L
Liu Jicong 已提交
36 37 38 39 40 41 42

  if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
43
  if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
44

L
Liu Jicong 已提交
45
  if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
46
  if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
L
Liu Jicong 已提交
47 48
  if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;

L
Liu Jicong 已提交
49 50
  int32_t sz = taosArrayGetSize(pObj->tasks);
  if (tEncodeI32(pEncoder, sz) < 0) return -1;
L
Liu Jicong 已提交
51 52 53 54 55 56 57 58 59 60 61
  for (int32_t i = 0; i < sz; i++) {
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
    int32_t innerSz = taosArrayGetSize(pArray);
    if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
    for (int32_t j = 0; j < innerSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pArray, j);
      if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
    }
  }

  if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
62

L
Liu Jicong 已提交
63 64 65 66 67
  return pEncoder->pos;
}

int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
  if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
68

L
Liu Jicong 已提交
69 70 71
  if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
  if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
L
Liu Jicong 已提交
72
  if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
73 74 75
  if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;

  if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
76
  if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
L
Liu Jicong 已提交
77

78
  if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
L
Liu Jicong 已提交
79
  if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
80 81
  if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
L
Liu Jicong 已提交
82 83 84 85 86 87 88

  if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
89
  if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
90

L
Liu Jicong 已提交
91
  if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
92
  if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
L
Liu Jicong 已提交
93
  if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
L
Liu Jicong 已提交
94

L
Liu Jicong 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114
  pObj->tasks = NULL;
  int32_t sz;
  if (tDecodeI32(pDecoder, &sz) < 0) return -1;
  if (sz != 0) {
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
    for (int32_t i = 0; i < sz; i++) {
      int32_t innerSz;
      if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
      for (int32_t j = 0; j < innerSz; j++) {
        SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
        if (pTask == NULL) return -1;
        if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
        taosArrayPush(pArray, &pTask);
      }
      taosArrayPush(pObj->tasks, &pArray);
    }
  }

  if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
115

L
Liu Jicong 已提交
116 117 118
  return 0;
}

L
Liu Jicong 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
void tFreeStreamObj(SStreamObj *pStream) {
  taosMemoryFree(pStream->sql);
  taosMemoryFree(pStream->ast);
  taosMemoryFree(pStream->physicalPlan);
  if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);

  int32_t sz = taosArrayGetSize(pStream->tasks);
  for (int32_t i = 0; i < sz; i++) {
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
    int32_t taskSz = taosArrayGetSize(pLevel);
    for (int32_t j = 0; j < taskSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
      tFreeSStreamTask(pTask);
    }
    taosArrayDestroy(pLevel);
  }
  taosArrayDestroy(pStream->tasks);
}

L
Liu Jicong 已提交
138 139 140 141 142 143 144 145 146 147
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
  if (pVgEpNew == NULL) return NULL;
  pVgEpNew->vgId = pVgEp->vgId;
  pVgEpNew->qmsg = strdup(pVgEp->qmsg);
  pVgEpNew->epSet = pVgEp->epSet;
  return pVgEpNew;
}

void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
L
Liu Jicong 已提交
148 149 150 151
  if (pVgEp) {
    taosMemoryFreeClear(pVgEp->qmsg);
    taosMemoryFree(pVgEp);
  }
L
Liu Jicong 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168
}

int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
  tlen += taosEncodeString(buf, pVgEp->qmsg);
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
  buf = taosDecodeString(buf, &pVgEp->qmsg);
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return (void *)buf;
}

169 170 171
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
  if (pConsumer == NULL) {
S
Shengliang Guan 已提交
172
    terrno = TSDB_CODE_OUT_OF_MEMORY;
173 174 175 176 177 178 179 180 181 182 183 184 185 186 187
    return NULL;
  }

  pConsumer->consumerId = consumerId;
  memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);

  pConsumer->epoch = 0;
  pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
  pConsumer->hbStatus = 0;

  taosInitRWLatch(&pConsumer->lock);

  pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
L
Liu Jicong 已提交
188
  pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *));
189

L
Liu Jicong 已提交
190 191
  if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
      pConsumer->assignedTopics == NULL) {
192 193 194
    taosArrayDestroy(pConsumer->currentTopics);
    taosArrayDestroy(pConsumer->rebNewTopics);
    taosArrayDestroy(pConsumer->rebRemovedTopics);
L
Liu Jicong 已提交
195
    taosArrayDestroy(pConsumer->assignedTopics);
196 197 198 199
    taosMemoryFree(pConsumer);
    return NULL;
  }

L
Liu Jicong 已提交
200 201
  pConsumer->upTime = taosGetTimestampMs();

202 203 204 205
  return pConsumer;
}

void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
206 207 208 209
  taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
210 211 212 213 214 215
}

int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
  int32_t tlen = 0;
  int32_t sz;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
216
  tlen += taosEncodeString(buf, pConsumer->clientId);
217 218 219 220 221
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
  tlen += taosEncodeFixedI32(buf, pConsumer->status);

L
Liu Jicong 已提交
222 223 224 225 226 227
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
  tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);

228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263
  // current topics
  if (pConsumer->currentTopics) {
    sz = taosArrayGetSize(pConsumer->currentTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb new topics
  if (pConsumer->rebNewTopics) {
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb removed topics
  if (pConsumer->rebRemovedTopics) {
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

L
Liu Jicong 已提交
264 265 266 267 268 269 270 271 272 273 274 275
  // lost topics
  if (pConsumer->assignedTopics) {
    sz = taosArrayGetSize(pConsumer->assignedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

276 277 278 279 280 281
  return tlen;
}

void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
282
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
283 284 285 286 287
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
  buf = taosDecodeFixedI32(buf, &pConsumer->status);

L
Liu Jicong 已提交
288 289 290 291 292 293
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
  buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);

294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320
  // current topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->currentTopics, &topic);
  }

  // reb new topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebNewTopics, &topic);
  }

  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebRemovedTopics, &topic);
  }

L
Liu Jicong 已提交
321 322 323 324 325 326 327 328 329
  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->assignedTopics, &topic);
  }

330 331 332
  return (void *)buf;
}

333 334 335 336 337 338
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
  SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
  if (pConsumerEpNew == NULL) return NULL;
  pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
  pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpOld->vgs, (FCopy)tCloneSMqVgEp);
  return pConsumerEpNew;
339 340
}

341 342
void tDeleteSMqConsumerEp(void *data) {
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp*)data;
343
  taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
344 345
}

346
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
347
  int32_t tlen = 0;
348 349 350 351
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
#if 0
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
L
Liu Jicong 已提交
352 353
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
354
    SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
L
Liu Jicong 已提交
355 356
    tlen += tEncodeSMqVgEp(buf, pVgEp);
  }
357
#endif
358 359 360
  return tlen;
}

361 362
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
L
Liu Jicong 已提交
363
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
364
#if 0
L
Liu Jicong 已提交
365 366
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
367
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
L
Liu Jicong 已提交
368 369 370
  for (int32_t i = 0; i < sz; i++) {
    SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
    buf = tDecodeSMqVgEp(buf, pVgEp);
371
    taosArrayPush(pConsumerEp->vgs, &pVgEp);
L
Liu Jicong 已提交
372
  }
373
#endif
L
Liu Jicong 已提交
374

375 376 377 378
  return (void *)buf;
}

SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
L
Liu Jicong 已提交
379
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
380 381 382
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
383
  pSubNew->vgNum = 0;
384
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
385 386 387 388 389
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/

  pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));

390 391 392 393 394 395 396 397
  return pSubNew;
}

SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
398

L
Liu Jicong 已提交
399
  pSubNew->dbUid = pSub->dbUid;
L
Liu Jicong 已提交
400
  pSubNew->stbUid = pSub->stbUid;
L
Liu Jicong 已提交
401
  pSubNew->subType = pSub->subType;
L
Liu Jicong 已提交
402
  pSubNew->withMeta = pSub->withMeta;
L
Liu Jicong 已提交
403

404 405
  pSubNew->vgNum = pSub->vgNum;
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
406 407 408 409
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
  void          *pIter = NULL;
  SMqConsumerEp *pConsumerEp = NULL;
410
  while (1) {
L
Liu Jicong 已提交
411
    pIter = taosHashIterate(pSub->consumerHash, pIter);
412
    if (pIter == NULL) break;
413 414 415 416
    pConsumerEp = (SMqConsumerEp *)pIter;
    SMqConsumerEp newEp = {
        .consumerId = pConsumerEp->consumerId,
        .vgs = taosArrayDeepCopy(pConsumerEp->vgs, (FCopy)tCloneSMqVgEp),
417
    };
418
    taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
419
  }
420
  pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp);
L
Liu Jicong 已提交
421
  memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
422 423 424 425
  return pSubNew;
}

void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
426 427 428 429 430 431 432
  void *pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
  }
433
  taosHashCleanup(pSub->consumerHash);
434
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
435 436 437 438 439
}

int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pSub->key);
L
Liu Jicong 已提交
440
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
441
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
L
Liu Jicong 已提交
442
  tlen += taosEncodeFixedI8(buf, pSub->subType);
L
Liu Jicong 已提交
443
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
L
Liu Jicong 已提交
444
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
445 446 447 448 449 450 451 452 453

  void   *pIter = NULL;
  int32_t sz = taosHashGetSize(pSub->consumerHash);
  tlen += taosEncodeFixedI32(buf, sz);

  int32_t cnt = 0;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
454 455
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
456 457 458
    cnt++;
  }
  ASSERT(cnt == sz);
459
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
L
Liu Jicong 已提交
460
  tlen += taosEncodeString(buf, pSub->dbName);
461 462 463 464 465 466
  return tlen;
}

void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
  //
  buf = taosDecodeStringTo(buf, pSub->key);
L
Liu Jicong 已提交
467
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
468
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
L
Liu Jicong 已提交
469
  buf = taosDecodeFixedI8(buf, &pSub->subType);
L
Liu Jicong 已提交
470
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
L
Liu Jicong 已提交
471
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
472 473 474 475 476 477

  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);

  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  for (int32_t i = 0; i < sz; i++) {
478 479 480
    SMqConsumerEp consumerEp = {0};
    buf = tDecodeSMqConsumerEp(buf, &consumerEp);
    taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
481 482
  }

483
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
L
Liu Jicong 已提交
484
  buf = taosDecodeStringTo(buf, pSub->dbName);
485 486 487 488 489 490 491
  return (void *)buf;
}

SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
  SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
  if (pEntryNew == NULL) return NULL;
  pEntryNew->epoch = pEntry->epoch;
492
  pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEp);
493 494 495 496
  return pEntryNew;
}

void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
497
  taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
498 499 500 501 502 503 504 505
}

int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pEntry->epoch);
  tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}
L
Liu Jicong 已提交
506

507 508 509 510 511 512 513 514 515 516
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
  buf = taosDecodeFixedI32(buf, &pEntry->epoch);
  buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}

SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
  SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
  if (pLogNew == NULL) return pLogNew;
  memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
517
  pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEp);
518 519 520 521
  return pLogNew;
}

void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
522
  taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
523 524 525 526 527 528 529 530 531 532 533 534 535 536
}

int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pLog->key);
  tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}

void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
  buf = taosDecodeStringTo(buf, pLog->key);
  buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}
L
Liu Jicong 已提交
537

S
Shengliang Guan 已提交
538 539 540 541 542 543 544 545 546 547 548
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pOffset->key);
  tlen += taosEncodeFixedI64(buf, pOffset->offset);
  return tlen;
}

void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
  buf = taosDecodeStringTo(buf, pOffset->key);
  buf = taosDecodeFixedI64(buf, &pOffset->offset);
  return buf;
L
Liu Jicong 已提交
549
}