mndDef.c 25.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndDef.h"
18 19
#include "mndConsumer.h"

L
Liu Jicong 已提交
20
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
L
Liu Jicong 已提交
21
  if (tStartEncode(pEncoder) < 0) return -1;
L
Liu Jicong 已提交
22
  if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
23

L
Liu Jicong 已提交
24 25 26
  if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
  if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
L
Liu Jicong 已提交
27
  if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
28 29 30
  if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;

  if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
31
  if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
L
Liu Jicong 已提交
32

33 34 35 36 37
  if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1;
  if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1;
  if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1;
L
Liu Jicong 已提交
38 39 40 41 42 43 44

  if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
45
  if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
46

S
Shengliang Guan 已提交
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
  if (pObj->sql != NULL) {
    if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }

  if (pObj->ast != NULL) {
    if (tEncodeCStr(pEncoder, pObj->ast) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }

  if (pObj->physicalPlan != NULL) {
    if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
  } else {
    if (tEncodeCStr(pEncoder, "") < 0) return -1;
  }
L
Liu Jicong 已提交
64

L
Liu Jicong 已提交
65 66
  int32_t sz = taosArrayGetSize(pObj->tasks);
  if (tEncodeI32(pEncoder, sz) < 0) return -1;
L
Liu Jicong 已提交
67 68 69 70 71 72
  for (int32_t i = 0; i < sz; i++) {
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
    int32_t innerSz = taosArrayGetSize(pArray);
    if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
    for (int32_t j = 0; j < innerSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pArray, j);
73
      if (tEncodeStreamTask(pEncoder, pTask) < 0) return -1;
L
Liu Jicong 已提交
74 75 76 77
    }
  }

  if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
78

79 80
  // 3.0.20
  if (tEncodeI64(pEncoder, pObj->checkpointFreq) < 0) return -1;
81
  if (tEncodeI8(pEncoder, pObj->igCheckUpdate) < 0) return -1;
82

L
Liu Jicong 已提交
83
  tEndEncode(pEncoder);
L
Liu Jicong 已提交
84 85 86
  return pEncoder->pos;
}

87
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
L
Liu Jicong 已提交
88
  if (tStartDecode(pDecoder) < 0) return -1;
L
Liu Jicong 已提交
89
  if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
90

L
Liu Jicong 已提交
91 92 93
  if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
  if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
L
Liu Jicong 已提交
94
  if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
L
Liu Jicong 已提交
95 96 97
  if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;

  if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
L
Liu Jicong 已提交
98
  if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
L
Liu Jicong 已提交
99

100 101 102 103 104
  if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1;
  if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1;
  if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1;
L
Liu Jicong 已提交
105 106 107 108 109 110 111

  if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
112
  if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
L
Liu Jicong 已提交
113

L
Liu Jicong 已提交
114
  if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
115
  if (tDecodeCStrAlloc(pDecoder, &pObj->ast) < 0) return -1;
L
Liu Jicong 已提交
116
  if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
L
Liu Jicong 已提交
117

L
Liu Jicong 已提交
118 119 120 121 122 123 124 125 126 127 128
  pObj->tasks = NULL;
  int32_t sz;
  if (tDecodeI32(pDecoder, &sz) < 0) return -1;
  if (sz != 0) {
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
    for (int32_t i = 0; i < sz; i++) {
      int32_t innerSz;
      if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
      for (int32_t j = 0; j < innerSz; j++) {
        SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
129 130 131 132
        if (pTask == NULL) {
          taosArrayDestroy(pArray);
          return -1;
        }
133
        if (tDecodeStreamTask(pDecoder, pTask) < 0) {
L
Liu Jicong 已提交
134
          taosMemoryFree(pTask);
135
          taosArrayDestroy(pArray);
L
Liu Jicong 已提交
136 137
          return -1;
        }
L
Liu Jicong 已提交
138 139 140 141 142 143 144
        taosArrayPush(pArray, &pTask);
      }
      taosArrayPush(pObj->tasks, &pArray);
    }
  }

  if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
L
Liu Jicong 已提交
145

146
  // 3.0.20
147 148
  if (sver >= 2) {
    if (tDecodeI64(pDecoder, &pObj->checkpointFreq) < 0) return -1;
149 150 151
    if (!tDecodeIsEnd(pDecoder)) {
      if (tDecodeI8(pDecoder, &pObj->igCheckUpdate) < 0) return -1;
    }
152
  }
L
Liu Jicong 已提交
153
  tEndDecode(pDecoder);
L
Liu Jicong 已提交
154 155 156
  return 0;
}

L
Liu Jicong 已提交
157 158 159 160
void tFreeStreamObj(SStreamObj *pStream) {
  taosMemoryFree(pStream->sql);
  taosMemoryFree(pStream->ast);
  taosMemoryFree(pStream->physicalPlan);
161 162 163 164

  if (pStream->outputSchema.nCols) {
    taosMemoryFree(pStream->outputSchema.pSchema);
  }
L
Liu Jicong 已提交
165 166 167 168 169 170 171

  int32_t sz = taosArrayGetSize(pStream->tasks);
  for (int32_t i = 0; i < sz; i++) {
    SArray *pLevel = taosArrayGetP(pStream->tasks, i);
    int32_t taskSz = taosArrayGetSize(pLevel);
    for (int32_t j = 0; j < taskSz; j++) {
      SStreamTask *pTask = taosArrayGetP(pLevel, j);
172
      tFreeStreamTask(pTask);
L
Liu Jicong 已提交
173
    }
174

L
Liu Jicong 已提交
175 176
    taosArrayDestroy(pLevel);
  }
177

L
Liu Jicong 已提交
178
  taosArrayDestroy(pStream->tasks);
H
Haojun Liao 已提交
179
  taosArrayDestroy(pStream->pHTasksList);
180

5
54liuyao 已提交
181 182 183 184
  // tagSchema.pSchema
  if (pStream->tagSchema.nCols > 0) {
    taosMemoryFree(pStream->tagSchema.pSchema);
  }
L
Liu Jicong 已提交
185 186
}

L
Liu Jicong 已提交
187 188 189 190
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
  if (pVgEpNew == NULL) return NULL;
  pVgEpNew->vgId = pVgEp->vgId;
wmmhello's avatar
wmmhello 已提交
191
//  pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
L
Liu Jicong 已提交
192 193 194 195 196
  pVgEpNew->epSet = pVgEp->epSet;
  return pVgEpNew;
}

void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
L
Liu Jicong 已提交
197
  if (pVgEp) {
wmmhello's avatar
wmmhello 已提交
198
//    taosMemoryFreeClear(pVgEp->qmsg);
L
Liu Jicong 已提交
199 200
    taosMemoryFree(pVgEp);
  }
L
Liu Jicong 已提交
201 202 203 204 205
}

int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
206
//  tlen += taosEncodeString(buf, pVgEp->qmsg);
L
Liu Jicong 已提交
207 208 209 210
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

211
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
L
Liu Jicong 已提交
212
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
213 214 215 216 217
  if(sver == 1){
    uint64_t size = 0;
    buf = taosDecodeVariantU64(buf, &size);
    buf = POINTER_SHIFT(buf, size);
  }
L
Liu Jicong 已提交
218 219 220 221
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return (void *)buf;
}

222 223 224
SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
  if (pConsumer == NULL) {
S
Shengliang Guan 已提交
225
    terrno = TSDB_CODE_OUT_OF_MEMORY;
226 227 228 229 230 231 232
    return NULL;
  }

  pConsumer->consumerId = consumerId;
  memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);

  pConsumer->epoch = 0;
233
  pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE;
234 235 236 237 238 239 240
  pConsumer->hbStatus = 0;

  taosInitRWLatch(&pConsumer->lock);

  pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
L
Liu Jicong 已提交
241
  pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *));
242

L
Liu Jicong 已提交
243 244
  if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
      pConsumer->assignedTopics == NULL) {
245 246 247
    taosArrayDestroy(pConsumer->currentTopics);
    taosArrayDestroy(pConsumer->rebNewTopics);
    taosArrayDestroy(pConsumer->rebRemovedTopics);
L
Liu Jicong 已提交
248
    taosArrayDestroy(pConsumer->assignedTopics);
249 250 251 252
    taosMemoryFree(pConsumer);
    return NULL;
  }

L
Liu Jicong 已提交
253 254
  pConsumer->upTime = taosGetTimestampMs();

255 256 257 258
  return pConsumer;
}

void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
L
Liu Jicong 已提交
259 260 261 262
  taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
  taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
263 264 265 266 267 268
}

int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
  int32_t tlen = 0;
  int32_t sz;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
269
  tlen += taosEncodeString(buf, pConsumer->clientId);
270 271 272 273 274
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
  tlen += taosEncodeFixedI32(buf, pConsumer->status);

L
Liu Jicong 已提交
275 276 277 278 279 280
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
  tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);

281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
  // current topics
  if (pConsumer->currentTopics) {
    sz = taosArrayGetSize(pConsumer->currentTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb new topics
  if (pConsumer->rebNewTopics) {
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb removed topics
  if (pConsumer->rebRemovedTopics) {
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

L
Liu Jicong 已提交
317 318 319 320 321 322 323 324 325 326 327 328
  // lost topics
  if (pConsumer->assignedTopics) {
    sz = taosArrayGetSize(pConsumer->assignedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

329 330 331 332
  tlen += taosEncodeFixedI8(buf, pConsumer->withTbName);
  tlen += taosEncodeFixedI8(buf, pConsumer->autoCommit);
  tlen += taosEncodeFixedI32(buf, pConsumer->autoCommitInterval);
  tlen += taosEncodeFixedI32(buf, pConsumer->resetOffsetCfg);
333 334 335
  return tlen;
}

336
void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t sver) {
337 338
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
339
  buf = taosDecodeStringTo(buf, pConsumer->clientId);
340 341 342 343 344
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
  buf = taosDecodeFixedI32(buf, &pConsumer->status);

L
Liu Jicong 已提交
345 346 347 348 349 350
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
  buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);

351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
  // current topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->currentTopics, &topic);
  }

  // reb new topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebNewTopics, &topic);
  }

  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebRemovedTopics, &topic);
  }

L
Liu Jicong 已提交
378 379 380 381 382 383 384 385 386
  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->assignedTopics, &topic);
  }

387 388 389 390 391 392
  if(sver > 1){
    buf = taosDecodeFixedI8(buf, &pConsumer->withTbName);
    buf = taosDecodeFixedI8(buf, &pConsumer->autoCommit);
    buf = taosDecodeFixedI32(buf, &pConsumer->autoCommitInterval);
    buf = taosDecodeFixedI32(buf, &pConsumer->resetOffsetCfg);
  }
393 394 395
  return (void *)buf;
}

396 397 398 399
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
//  SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
//  if (pConsumerEpNew == NULL) return NULL;
//  pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
400
//  pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
401 402 403 404 405
//  return pConsumerEpNew;
//}
//
//void tDeleteSMqConsumerEp(void *data) {
//  SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
406
//  taosArrayDestroy(pConsumerEp->vgs);
407
//}
408

409
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
410
  int32_t tlen = 0;
411 412
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
  int32_t szVgs = taosArrayGetSize(pConsumerEp->offsetRows);
  tlen += taosEncodeFixedI32(buf, szVgs);
  for (int32_t j= 0; j < szVgs; ++j) {
    OffsetRows *offRows = taosArrayGet(pConsumerEp->offsetRows, j);
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
    tlen += taosEncodeFixedI64(buf, offRows->rows);
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
    } else {
      // do nothing
    }
L
Liu Jicong 已提交
428
  }
429 430 431 432 433 434 435 436
//#if 0
//  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
//  tlen += taosEncodeFixedI32(buf, sz);
//  for (int32_t i = 0; i < sz; i++) {
//    SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
//    tlen += tEncodeSMqVgEp(buf, pVgEp);
//  }
//#endif
437 438 439
  return tlen;
}

440
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
441
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
442
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463
  if (sver > 1){
    int32_t szVgs = 0;
    buf = taosDecodeFixedI32(buf, &szVgs);
    if(szVgs > 0){
      pConsumerEp->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
      if (NULL == pConsumerEp->offsetRows) return NULL;
      for (int32_t j= 0; j < szVgs; ++j) {
        OffsetRows* offRows = taosArrayReserve(pConsumerEp->offsetRows, 1);
        buf = taosDecodeFixedI32(buf, &offRows->vgId);
        buf = taosDecodeFixedI64(buf, &offRows->rows);
        buf = taosDecodeFixedI8(buf, &offRows->offset.type);
        if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
          buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
          buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
        } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
          buf = taosDecodeFixedI64(buf, &offRows->offset.version);
        } else {
          // do nothing
        }
      }
    }
L
Liu Jicong 已提交
464
  }
465 466 467 468 469 470 471 472 473 474
//#if 0
//  int32_t sz;
//  buf = taosDecodeFixedI32(buf, &sz);
//  pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
//  for (int32_t i = 0; i < sz; i++) {
//    SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
//    buf = tDecodeSMqVgEp(buf, pVgEp);
//    taosArrayPush(pConsumerEp->vgs, &pVgEp);
//  }
//#endif
L
Liu Jicong 已提交
475

476 477 478
  return (void *)buf;
}

479 480 481 482 483
SMqSubscribeObj *tNewSubscribeObj(const char* key) {
  SMqSubscribeObj *pSubObj = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
  if (pSubObj == NULL) {
    return NULL;
  }
484

485 486 487 488
  memcpy(pSubObj->key, key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubObj->lock);
  pSubObj->vgNum = 0;
  pSubObj->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
489

490 491 492 493
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubObj->consumerHash, tDeleteSMqConsumerEp);*/
  pSubObj->unassignedVgs = taosArrayInit(0, POINTER_BYTES);
  return pSubObj;
494 495 496 497 498 499 500
}

SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
501

L
Liu Jicong 已提交
502
  pSubNew->dbUid = pSub->dbUid;
L
Liu Jicong 已提交
503
  pSubNew->stbUid = pSub->stbUid;
L
Liu Jicong 已提交
504
  pSubNew->subType = pSub->subType;
L
Liu Jicong 已提交
505
  pSubNew->withMeta = pSub->withMeta;
L
Liu Jicong 已提交
506

507 508
  pSubNew->vgNum = pSub->vgNum;
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
509 510 511 512
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
  void          *pIter = NULL;
  SMqConsumerEp *pConsumerEp = NULL;
513
  while (1) {
L
Liu Jicong 已提交
514
    pIter = taosHashIterate(pSub->consumerHash, pIter);
515
    if (pIter == NULL) break;
516 517 518
    pConsumerEp = (SMqConsumerEp *)pIter;
    SMqConsumerEp newEp = {
        .consumerId = pConsumerEp->consumerId,
H
Haojun Liao 已提交
519
        .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp),
520
    };
521
    taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
522
  }
H
Haojun Liao 已提交
523
  pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp);
524
  pSubNew->offsetRows = taosArrayDup(pSub->offsetRows, NULL);
L
Liu Jicong 已提交
525
  memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
526
  pSubNew->qmsg = taosStrdup(pSub->qmsg);
527 528 529 530
  return pSubNew;
}

void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
L
Liu Jicong 已提交
531 532 533 534 535 536
  void *pIter = NULL;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
537
    taosArrayDestroy(pConsumerEp->offsetRows);
L
Liu Jicong 已提交
538
  }
539
  taosHashCleanup(pSub->consumerHash);
540
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
541
  taosMemoryFreeClear(pSub->qmsg);
542
  taosArrayDestroy(pSub->offsetRows);
543 544 545 546 547
}

int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pSub->key);
L
Liu Jicong 已提交
548
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
549
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
L
Liu Jicong 已提交
550
  tlen += taosEncodeFixedI8(buf, pSub->subType);
L
Liu Jicong 已提交
551
  tlen += taosEncodeFixedI8(buf, pSub->withMeta);
L
Liu Jicong 已提交
552
  tlen += taosEncodeFixedI64(buf, pSub->stbUid);
553 554 555 556 557 558 559 560 561

  void   *pIter = NULL;
  int32_t sz = taosHashGetSize(pSub->consumerHash);
  tlen += taosEncodeFixedI32(buf, sz);

  int32_t cnt = 0;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
562 563
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
564 565
    cnt++;
  }
566
  if (cnt != sz) return -1;
567
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
L
Liu Jicong 已提交
568
  tlen += taosEncodeString(buf, pSub->dbName);
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585

  int32_t szVgs = taosArrayGetSize(pSub->offsetRows);
  tlen += taosEncodeFixedI32(buf, szVgs);
  for (int32_t j= 0; j < szVgs; ++j) {
    OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j);
    tlen += taosEncodeFixedI32(buf, offRows->vgId);
    tlen += taosEncodeFixedI64(buf, offRows->rows);
    tlen += taosEncodeFixedI8(buf, offRows->offset.type);
    if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
      tlen += taosEncodeFixedI64(buf, offRows->offset.uid);
      tlen += taosEncodeFixedI64(buf, offRows->offset.ts);
    } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
      tlen += taosEncodeFixedI64(buf, offRows->offset.version);
    } else {
      // do nothing
    }
  }
586
  tlen += taosEncodeString(buf, pSub->qmsg);
587 588 589
  return tlen;
}

590
void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
591 592
  //
  buf = taosDecodeStringTo(buf, pSub->key);
L
Liu Jicong 已提交
593
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
594
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
L
Liu Jicong 已提交
595
  buf = taosDecodeFixedI8(buf, &pSub->subType);
L
Liu Jicong 已提交
596
  buf = taosDecodeFixedI8(buf, &pSub->withMeta);
L
Liu Jicong 已提交
597
  buf = taosDecodeFixedI64(buf, &pSub->stbUid);
598 599 600 601 602 603

  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);

  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  for (int32_t i = 0; i < sz; i++) {
604
    SMqConsumerEp consumerEp = {0};
605
    buf = tDecodeSMqConsumerEp(buf, &consumerEp, sver);
606
    taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
607 608
  }

609
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
L
Liu Jicong 已提交
610
  buf = taosDecodeStringTo(buf, pSub->dbName);
611

612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632
  if (sver > 1){
    int32_t szVgs = 0;
    buf = taosDecodeFixedI32(buf, &szVgs);
    if(szVgs > 0){
      pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
      if (NULL == pSub->offsetRows) return NULL;
      for (int32_t j= 0; j < szVgs; ++j) {
        OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1);
        buf = taosDecodeFixedI32(buf, &offRows->vgId);
        buf = taosDecodeFixedI64(buf, &offRows->rows);
        buf = taosDecodeFixedI8(buf, &offRows->offset.type);
        if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) {
          buf = taosDecodeFixedI64(buf, &offRows->offset.uid);
          buf = taosDecodeFixedI64(buf, &offRows->offset.ts);
        } else if (offRows->offset.type == TMQ_OFFSET__LOG) {
          buf = taosDecodeFixedI64(buf, &offRows->offset.version);
        } else {
          // do nothing
        }
      }
    }
633
    buf = taosDecodeString(buf, &pSub->qmsg);
634
  }
635 636
  return (void *)buf;
}
L
Liu Jicong 已提交
637

638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
//SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
//  SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
//  if (pEntryNew == NULL) return NULL;
//  pEntryNew->epoch = pEntry->epoch;
//  pEntryNew->consumers = taosArrayDup(pEntry->consumers, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
//  return pEntryNew;
//}
//
//void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
//  taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
//}

//int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
//  int32_t tlen = 0;
//  tlen += taosEncodeFixedI32(buf, pEntry->epoch);
//  tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
//  return tlen;
//}
//
//void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
//  buf = taosDecodeFixedI32(buf, &pEntry->epoch);
//  buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
//  return (void *)buf;
//}

//SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
//  SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
//  if (pLogNew == NULL) return pLogNew;
//  memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
//  pLogNew->logs = taosArrayDup(pLog->logs, (__array_item_dup_fn_t)tCloneSMqConsumerEp);
//  return pLogNew;
//}
//
//void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
//  taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
//}

//int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
//  int32_t tlen = 0;
//  tlen += taosEncodeString(buf, pLog->key);
//  tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
//  return tlen;
//}
//
//void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
//  buf = taosDecodeStringTo(buf, pLog->key);
//  buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
//  return (void *)buf;
//}
//
//int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
//  int32_t tlen = 0;
//  tlen += taosEncodeString(buf, pOffset->key);
//  tlen += taosEncodeFixedI64(buf, pOffset->offset);
//  return tlen;
//}
//
//void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
//  buf = taosDecodeStringTo(buf, pOffset->key);
//  buf = taosDecodeFixedI64(buf, &pOffset->offset);
//  return buf;
//}