mndDef.c 20.4 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndDef.h"
18 19
#include "mndConsumer.h"

L
Liu Jicong 已提交
20
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
L
Liu Jicong 已提交
21
  if (tStartEncode(pEncoder) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24 25 26
  if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
  if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
L
Liu Jicong 已提交
27
  if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
28 29 30
  if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;

  if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
31
  if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
L
Liu Jicong 已提交
32

33
  if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
L
Liu Jicong 已提交
34
  if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
L
Liu Jicong 已提交
35
  if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
36 37
  if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
L
Liu Jicong 已提交
38 39 40 41 42 43 44

  if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
45
  if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
46

S
Shengliang Guan 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
  if (pObj->sql != NULL) {
    if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }

  if (pObj->ast != NULL) {
    if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }

  if (pObj->physicalPlan != NULL) {
    if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
  int32_t sz = taosArrayGetSize(pObj->tasks);
  if (tEncodeI32(pEncoder, sz) < 0) return -1;
L
Liu Jicong 已提交
67 68 69 70 71 72 73 74 75 76 77
  for (int32_t i = 0; i < sz; i++) {
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
    int32_t innerSz = taosArrayGetSize(pArray);
    if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
    for (int32_t j = 0; j < innerSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pArray, j);
      if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
    }
  }

  if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
78

79 80 81
  // 3.0.20
  if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;

L
Liu Jicong 已提交
82
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
83 84 85 86
  return pEncoder->pos;
}

int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
L
Liu Jicong 已提交
87
  if (tStartDecode(pDecoder) < 0) return -1;
L
Liu Jicong 已提交
88
  if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
89

L
Liu Jicong 已提交
90 91 92
  if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
  if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
L
Liu Jicong 已提交
93
  if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
94 95 96
  if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;

  if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
97
  if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
L
Liu Jicong 已提交
98

99
  if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
L
Liu Jicong 已提交
100
  if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
L
Liu Jicong 已提交
101
  if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
102 103
  if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
L
Liu Jicong 已提交
104 105 106 107 108 109 110

  if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
111
  if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
112

L
Liu Jicong 已提交
113
  if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
114
  if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
L
Liu Jicong 已提交
115
  if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
L
Liu Jicong 已提交
116

L
Liu Jicong 已提交
117 118 119 120 121 122 123 124 125 126 127
  pObj->tasks = NULL;
  int32_t sz;
  if (tDecodeI32(pDecoder, &sz) < 0) return -1;
  if (sz != 0) {
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
    for (int32_t i = 0; i < sz; i++) {
      int32_t innerSz;
      if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
      for (int32_t j = 0; j < innerSz; j++) {
        SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
128 129 130 131
        if (pTask == NULL) {
          taosArrayDestroy(pArray);
          return -1;
        }
L
Liu Jicong 已提交
132 133
        if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
          taosMemoryFree(pTask);
134
          taosArrayDestroy(pArray);
L
Liu Jicong 已提交
135 136
          return -1;
        }
L
Liu Jicong 已提交
137 138 139 140 141 142 143
        taosArrayPush(pArray, &pTask);
      }
      taosArrayPush(pObj->tasks, &pArray);
    }
  }

  if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
144

145 146 147
  // 3.0.20
  if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;

L
Liu Jicong 已提交
148
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
149 150 151
  return 0;
}

L
Liu Jicong 已提交
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
void tFreeStreamObj(SStreamObj *pStream) {
  taosMemoryFree(pStream->sql);
  taosMemoryFree(pStream->ast);
  taosMemoryFree(pStream->physicalPlan);
  if (pStream->outputSchema.nCols) taosMemoryFree(pStream->outputSchema.pSchema);

  int32_t sz = taosArrayGetSize(pStream->tasks);
  for (int32_t i = 0; i < sz; i++) {
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
    int32_t taskSz = taosArrayGetSize(pLevel);
    for (int32_t j = 0; j < taskSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
      tFreeSStreamTask(pTask);
    }
    taosArrayDestroy(pLevel);
  }
  taosArrayDestroy(pStream->tasks);
}

L
Liu Jicong 已提交
171 172 173 174 175 176 177 178 179 180
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
  if (pVgEpNew == NULL) return NULL;
  pVgEpNew->vgId = pVgEp->vgId;
  pVgEpNew->qmsg = strdup(pVgEp->qmsg);
  pVgEpNew->epSet = pVgEp->epSet;
  return pVgEpNew;
}

void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
L
Liu Jicong 已提交
181 182 183 184
  if (pVgEp) {
    taosMemoryFreeClear(pVgEp->qmsg);
    taosMemoryFree(pVgEp);
  }
L
Liu Jicong 已提交
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201
}

int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
  tlen += taosEncodeString(buf, pVgEp->qmsg);
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
  buf = taosDecodeString(buf, &pVgEp->qmsg);
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return (void *)buf;
}

202 203 204
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
  if (pConsumer == NULL) {
S
Shengliang Guan 已提交
205
    terrno = TSDB_CODE_OUT_OF_MEMORY;
206 207 208 209 210 211 212 213 214 215 216 217 218 219 220
    return NULL;
  }

  pConsumer->consumerId = consumerId;
  memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);

  pConsumer->epoch = 0;
  pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
  pConsumer->hbStatus = 0;

  taosInitRWLatch(&pConsumer->lock);

  pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
L
Liu Jicong 已提交
221
  pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *));
222

L
Liu Jicong 已提交
223 224
  if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
      pConsumer->assignedTopics == NULL) {
225 226 227
    taosArrayDestroy(pConsumer->currentTopics);
    taosArrayDestroy(pConsumer->rebNewTopics);
    taosArrayDestroy(pConsumer->rebRemovedTopics);
L
Liu Jicong 已提交
228
    taosArrayDestroy(pConsumer->assignedTopics);
229 230 231 232
    taosMemoryFree(pConsumer);
    return NULL;
  }

L
Liu Jicong 已提交
233 234
  pConsumer->upTime = taosGetTimestampMs();

235 236 237 238
  return pConsumer;
}

void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
239 240 241 242
  taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
243 244 245 246 247 248
}

int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
  int32_t tlen = 0;
  int32_t sz;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
249
  tlen += taosEncodeString(buf, pConsumer->clientId);
250 251 252 253 254
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
  tlen += taosEncodeFixedI32(buf, pConsumer->status);

L
Liu Jicong 已提交
255 256 257 258 259 260
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
  tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);

261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296
  // current topics
  if (pConsumer->currentTopics) {
    sz = taosArrayGetSize(pConsumer->currentTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb new topics
  if (pConsumer->rebNewTopics) {
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb removed topics
  if (pConsumer->rebRemovedTopics) {
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

L
Liu Jicong 已提交
297 298 299 300 301 302 303 304 305 306 307 308
  // lost topics
  if (pConsumer->assignedTopics) {
    sz = taosArrayGetSize(pConsumer->assignedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

309 310 311 312 313 314
  return tlen;
}

void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
315
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
316 317 318 319 320
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
  buf = taosDecodeFixedI32(buf, &pConsumer->status);

L
Liu Jicong 已提交
321 322 323 324 325 326
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
  buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);

327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
  // current topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->currentTopics, &topic);
  }

  // reb new topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebNewTopics, &topic);
  }

  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebRemovedTopics, &topic);
  }

L
Liu Jicong 已提交
354 355 356 357 358 359 360 361 362
  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->assignedTopics, &topic);
  }

363 364 365
  return (void *)buf;
}

366 367 368 369
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
  SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
  if (pConsumerEpNew == NULL) return NULL;
  pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
H
Haojun Liao 已提交
370
  pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
371
  return pConsumerEpNew;
372 373
}

374
void tDeleteSMqConsumerEp(void *data) {
L
Liu Jicong 已提交
375
  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
376
  taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
377 378
}

379
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
380
  int32_t tlen = 0;
381 382 383 384
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
#if 0
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
L
Liu Jicong 已提交
385 386
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
387
    SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
L
Liu Jicong 已提交
388 389
    tlen += tEncodeSMqVgEp(buf, pVgEp);
  }
390
#endif
391 392 393
  return tlen;
}

394 395
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
L
Liu Jicong 已提交
396
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
397
#if 0
L
Liu Jicong 已提交
398 399
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
400
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
L
Liu Jicong 已提交
401 402 403
  for (int32_t i = 0; i < sz; i++) {
    SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
    buf = tDecodeSMqVgEp(buf, pVgEp);
404
    taosArrayPush(pConsumerEp->vgs, &pVgEp);
L
Liu Jicong 已提交
405
  }
406
#endif
L
Liu Jicong 已提交
407

408 409 410 411
  return (void *)buf;
}

SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
L
Liu Jicong 已提交
412
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
413 414 415
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
416
  pSubNew->vgNum = 0;
417
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
418 419 420 421 422
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/

  pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));

423 424 425 426 427 428 429 430
  return pSubNew;
}

SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
431

L
Liu Jicong 已提交
432
  pSubNew->dbUid = pSub->dbUid;
L
Liu Jicong 已提交
433
  pSubNew->stbUid = pSub->stbUid;
L
Liu Jicong 已提交
434
  pSubNew->subType = pSub->subType;
L
Liu Jicong 已提交
435
  pSubNew->withMeta = pSub->withMeta;
L
Liu Jicong 已提交
436

437 438
  pSubNew->vgNum = pSub->vgNum;
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
439 440 441 442
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
  void          *pIter = NULL;
  SMqConsumerEp *pConsumerEp = NULL;
443
  while (1) {
L
Liu Jicong 已提交
444
    pIter = taosHashIterate(pSub->consumerHash, pIter);
445
    if (pIter == NULL) break;
446 447 448
    pConsumerEp = (SMqConsumerEp *)pIter;
    SMqConsumerEp newEp = {
        .consumerId = pConsumerEp->consumerId,
H
Haojun Liao 已提交
449
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
450
    };
451
    taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
452
  }
H
Haojun Liao 已提交
453
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
L
Liu Jicong 已提交
454
  memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
455 456 457 458
  return pSubNew;
}

void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
459 460 461 462 463 464 465
  void *pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
  }
466
  taosHashCleanup(pSub->consumerHash);
467
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
468 469 470 471 472
}

int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pSub->key);
L
Liu Jicong 已提交
473
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
474
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
L
Liu Jicong 已提交
475
  tlen += taosEncodeFixedI8(buf, pSub->subType);
L
Liu Jicong 已提交
476
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
L
Liu Jicong 已提交
477
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
478 479 480 481 482 483 484 485 486

  void   *pIter = NULL;
  int32_t sz = taosHashGetSize(pSub->consumerHash);
  tlen += taosEncodeFixedI32(buf, sz);

  int32_t cnt = 0;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
487 488
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
489 490 491
    cnt++;
  }
  ASSERT(cnt == sz);
492
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
L
Liu Jicong 已提交
493
  tlen += taosEncodeString(buf, pSub->dbName);
494 495 496 497 498 499
  return tlen;
}

void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
  //
  buf = taosDecodeStringTo(buf, pSub->key);
L
Liu Jicong 已提交
500
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
501
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
L
Liu Jicong 已提交
502
  buf = taosDecodeFixedI8(buf, &pSub->subType);
L
Liu Jicong 已提交
503
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
L
Liu Jicong 已提交
504
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
505 506 507 508 509 510

  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);

  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  for (int32_t i = 0; i < sz; i++) {
511 512 513
    SMqConsumerEp consumerEp = {0};
    buf = tDecodeSMqConsumerEp(buf, &consumerEp);
    taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
514 515
  }

516
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
L
Liu Jicong 已提交
517
  buf = taosDecodeStringTo(buf, pSub->dbName);
518 519 520 521 522 523 524
  return (void *)buf;
}

SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
  SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
  if (pEntryNew == NULL) return NULL;
  pEntryNew->epoch = pEntry->epoch;
H
Haojun Liao 已提交
525
  pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
526 527 528 529
  return pEntryNew;
}

void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
530
  taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
531 532 533 534 535 536 537 538
}

int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pEntry->epoch);
  tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}
L
Liu Jicong 已提交
539

540 541 542 543 544 545 546 547 548 549
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
  buf = taosDecodeFixedI32(buf, &pEntry->epoch);
  buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}

SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
  SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
  if (pLogNew == NULL) return pLogNew;
  memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
H
Haojun Liao 已提交
550
  pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
551 552 553 554
  return pLogNew;
}

void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
555
  taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
556 557 558 559 560 561 562 563 564 565 566 567 568 569
}

int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pLog->key);
  tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}

void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
  buf = taosDecodeStringTo(buf, pLog->key);
  buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}
L
Liu Jicong 已提交
570

S
Shengliang Guan 已提交
571 572 573 574 575 576 577 578 579 580 581
int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pOffset->key);
  tlen += taosEncodeFixedI64(buf, pOffset->offset);
  return tlen;
}

void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
  buf = taosDecodeStringTo(buf, pOffset->key);
  buf = taosDecodeFixedI64(buf, &pOffset->offset);
  return buf;
L
Liu Jicong 已提交
582
}