mndDef.c 19.2 KB
Newer Older
L
Liu Jicong 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16
#define _DEFAULT_SOURCE
L
Liu Jicong 已提交
17
#include "mndDef.h"
18 19 20 21 22
#include "mndConsumer.h"

SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]) {
  SMqConsumerObj *pConsumer = taosMemoryCalloc(1, sizeof(SMqConsumerObj));
  if (pConsumer == NULL) {
S
Shengliang Guan 已提交
23
    terrno = TSDB_CODE_OUT_OF_MEMORY;
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
    return NULL;
  }

  pConsumer->consumerId = consumerId;
  memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN);

  pConsumer->epoch = 0;
  pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
  pConsumer->hbStatus = 0;

  taosInitRWLatch(&pConsumer->lock);

  pConsumer->currentTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebNewTopics = taosArrayInit(0, sizeof(void *));
  pConsumer->rebRemovedTopics = taosArrayInit(0, sizeof(void *));
L
Liu Jicong 已提交
39
  pConsumer->assignedTopics = taosArrayInit(0, sizeof(void *));
40

L
Liu Jicong 已提交
41 42
  if (pConsumer->currentTopics == NULL || pConsumer->rebNewTopics == NULL || pConsumer->rebRemovedTopics == NULL ||
      pConsumer->assignedTopics == NULL) {
43 44 45
    taosArrayDestroy(pConsumer->currentTopics);
    taosArrayDestroy(pConsumer->rebNewTopics);
    taosArrayDestroy(pConsumer->rebRemovedTopics);
L
Liu Jicong 已提交
46
    taosArrayDestroy(pConsumer->assignedTopics);
47 48 49 50
    taosMemoryFree(pConsumer);
    return NULL;
  }

L
Liu Jicong 已提交
51 52
  pConsumer->upTime = taosGetTimestampMs();

53 54 55 56 57 58 59 60 61 62 63 64 65
  return pConsumer;
}

void tDeleteSMqConsumerObj(SMqConsumerObj *pConsumer) {
  if (pConsumer->currentTopics) {
    taosArrayDestroyP(pConsumer->currentTopics, (FDelete)taosMemoryFree);
  }
  if (pConsumer->rebNewTopics) {
    taosArrayDestroyP(pConsumer->rebNewTopics, (FDelete)taosMemoryFree);
  }
  if (pConsumer->rebRemovedTopics) {
    taosArrayDestroyP(pConsumer->rebRemovedTopics, (FDelete)taosMemoryFree);
  }
L
Liu Jicong 已提交
66 67 68
  if (pConsumer->assignedTopics) {
    taosArrayDestroyP(pConsumer->assignedTopics, (FDelete)taosMemoryFree);
  }
69 70 71 72 73 74 75 76 77 78 79
}

int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
  int32_t tlen = 0;
  int32_t sz;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
  tlen += taosEncodeString(buf, pConsumer->cgroup);
  tlen += taosEncodeFixedI8(buf, pConsumer->updateType);
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
  tlen += taosEncodeFixedI32(buf, pConsumer->status);

L
Liu Jicong 已提交
80 81 82 83 84 85
  tlen += taosEncodeFixedI32(buf, pConsumer->pid);
  tlen += taosEncodeSEpSet(buf, &pConsumer->ep);
  tlen += taosEncodeFixedI64(buf, pConsumer->upTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->subscribeTime);
  tlen += taosEncodeFixedI64(buf, pConsumer->rebalanceTime);

86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
  // current topics
  if (pConsumer->currentTopics) {
    sz = taosArrayGetSize(pConsumer->currentTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->currentTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb new topics
  if (pConsumer->rebNewTopics) {
    sz = taosArrayGetSize(pConsumer->rebNewTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebNewTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

  // reb removed topics
  if (pConsumer->rebRemovedTopics) {
    sz = taosArrayGetSize(pConsumer->rebRemovedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->rebRemovedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

L
Liu Jicong 已提交
122 123 124 125 126 127 128 129 130 131 132 133
  // lost topics
  if (pConsumer->assignedTopics) {
    sz = taosArrayGetSize(pConsumer->assignedTopics);
    tlen += taosEncodeFixedI32(buf, sz);
    for (int32_t i = 0; i < sz; i++) {
      char *topic = taosArrayGetP(pConsumer->assignedTopics, i);
      tlen += taosEncodeString(buf, topic);
    }
  } else {
    tlen += taosEncodeFixedI32(buf, 0);
  }

134 135 136 137 138 139 140 141 142 143 144
  return tlen;
}

void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
  buf = taosDecodeFixedI8(buf, &pConsumer->updateType);
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
  buf = taosDecodeFixedI32(buf, &pConsumer->status);

L
Liu Jicong 已提交
145 146 147 148 149 150
  buf = taosDecodeFixedI32(buf, &pConsumer->pid);
  buf = taosDecodeSEpSet(buf, &pConsumer->ep);
  buf = taosDecodeFixedI64(buf, &pConsumer->upTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->subscribeTime);
  buf = taosDecodeFixedI64(buf, &pConsumer->rebalanceTime);

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
  // current topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->currentTopics, &topic);
  }

  // reb new topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebNewTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebNewTopics, &topic);
  }

  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->rebRemovedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->rebRemovedTopics, &topic);
  }

L
Liu Jicong 已提交
178 179 180 181 182 183 184 185 186
  // reb removed topics
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->assignedTopics = taosArrayInit(sz, sizeof(void *));
  for (int32_t i = 0; i < sz; i++) {
    char *topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->assignedTopics, &topic);
  }

187 188 189 190 191 192 193 194 195 196 197 198
  return (void *)buf;
}

SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
  SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
  if (pVgEpNew == NULL) return NULL;
  pVgEpNew->vgId = pVgEp->vgId;
  pVgEpNew->qmsg = strdup(pVgEp->qmsg);
  pVgEpNew->epSet = pVgEp->epSet;
  return pVgEpNew;
}

L
Liu Jicong 已提交
199 200 201
void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
  if (pVgEp->qmsg) taosMemoryFree(pVgEp->qmsg);
}
202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217

int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
  tlen += taosEncodeString(buf, pVgEp->qmsg);
  tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
  return tlen;
}

void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
  buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
  buf = taosDecodeString(buf, &pVgEp->qmsg);
  buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
  return (void *)buf;
}

218 219 220 221 222 223
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
  SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
  if (pConsumerEpNew == NULL) return NULL;
  pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
  pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpOld->vgs, (FCopy)tCloneSMqVgEp);
  return pConsumerEpNew;
224 225
}

226 227 228
void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) {
  //
  taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
229 230
}

231
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
232
  int32_t tlen = 0;
233 234 235 236
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
  tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
#if 0
  int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
L
Liu Jicong 已提交
237 238
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
239
    SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
L
Liu Jicong 已提交
240 241
    tlen += tEncodeSMqVgEp(buf, pVgEp);
  }
242
#endif
243 244 245
  return tlen;
}

246 247
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
L
Liu Jicong 已提交
248
  buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
249
#if 0
L
Liu Jicong 已提交
250 251
  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);
252
  pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
L
Liu Jicong 已提交
253 254 255
  for (int32_t i = 0; i < sz; i++) {
    SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
    buf = tDecodeSMqVgEp(buf, pVgEp);
256
    taosArrayPush(pConsumerEp->vgs, &pVgEp);
L
Liu Jicong 已提交
257
  }
258
#endif
L
Liu Jicong 已提交
259

260 261 262 263
  return (void *)buf;
}

SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
L
Liu Jicong 已提交
264
  SMqSubscribeObj *pSubNew = taosMemoryCalloc(1, sizeof(SMqSubscribeObj));
265 266 267
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
268
  pSubNew->vgNum = 0;
269
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
270 271 272 273 274
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/

  pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));

275 276 277 278 279 280 281 282
  return pSubNew;
}

SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
  SMqSubscribeObj *pSubNew = taosMemoryMalloc(sizeof(SMqSubscribeObj));
  if (pSubNew == NULL) return NULL;
  memcpy(pSubNew->key, pSub->key, TSDB_SUBSCRIBE_KEY_LEN);
  taosInitRWLatch(&pSubNew->lock);
L
Liu Jicong 已提交
283

L
Liu Jicong 已提交
284
  pSubNew->dbUid = pSub->dbUid;
L
Liu Jicong 已提交
285 286 287 288 289
  pSubNew->subType = pSub->subType;
  pSubNew->withTbName = pSub->withTbName;
  pSubNew->withSchema = pSub->withSchema;
  pSubNew->withTag = pSub->withTag;

290 291
  pSubNew->vgNum = pSub->vgNum;
  pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
292 293 294 295
  // TODO set hash free fp
  /*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
  void          *pIter = NULL;
  SMqConsumerEp *pConsumerEp = NULL;
296
  while (1) {
L
Liu Jicong 已提交
297
    pIter = taosHashIterate(pSub->consumerHash, pIter);
298
    if (pIter == NULL) break;
299 300 301 302
    pConsumerEp = (SMqConsumerEp *)pIter;
    SMqConsumerEp newEp = {
        .consumerId = pConsumerEp->consumerId,
        .vgs = taosArrayDeepCopy(pConsumerEp->vgs, (FCopy)tCloneSMqVgEp),
303
    };
304
    taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
305
  }
306
  pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp);
307 308 309 310 311
  return pSubNew;
}

void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
  taosHashCleanup(pSub->consumerHash);
312
  taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
313 314 315 316 317
}

int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pSub->key);
L
Liu Jicong 已提交
318
  tlen += taosEncodeFixedI64(buf, pSub->dbUid);
319
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
L
Liu Jicong 已提交
320 321 322 323
  tlen += taosEncodeFixedI8(buf, pSub->subType);
  tlen += taosEncodeFixedI8(buf, pSub->withTbName);
  tlen += taosEncodeFixedI8(buf, pSub->withSchema);
  tlen += taosEncodeFixedI8(buf, pSub->withTag);
324 325 326 327 328 329 330 331 332

  void   *pIter = NULL;
  int32_t sz = taosHashGetSize(pSub->consumerHash);
  tlen += taosEncodeFixedI32(buf, sz);

  int32_t cnt = 0;
  while (1) {
    pIter = taosHashIterate(pSub->consumerHash, pIter);
    if (pIter == NULL) break;
333 334
    SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
    tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
335 336 337
    cnt++;
  }
  ASSERT(cnt == sz);
338
  tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
339 340 341 342 343 344
  return tlen;
}

void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
  //
  buf = taosDecodeStringTo(buf, pSub->key);
L
Liu Jicong 已提交
345
  buf = taosDecodeFixedI64(buf, &pSub->dbUid);
346
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
L
Liu Jicong 已提交
347 348 349 350
  buf = taosDecodeFixedI8(buf, &pSub->subType);
  buf = taosDecodeFixedI8(buf, &pSub->withTbName);
  buf = taosDecodeFixedI8(buf, &pSub->withSchema);
  buf = taosDecodeFixedI8(buf, &pSub->withTag);
351 352 353 354 355 356

  int32_t sz;
  buf = taosDecodeFixedI32(buf, &sz);

  pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
  for (int32_t i = 0; i < sz; i++) {
357 358 359
    SMqConsumerEp consumerEp = {0};
    buf = tDecodeSMqConsumerEp(buf, &consumerEp);
    taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
360 361
  }

362
  buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
363 364 365 366 367 368 369
  return (void *)buf;
}

SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
  SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
  if (pEntryNew == NULL) return NULL;
  pEntryNew->epoch = pEntry->epoch;
370
  pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEp);
371 372 373 374
  return pEntryNew;
}

void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
375
  taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
376 377 378 379 380 381 382 383
}

int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pEntry->epoch);
  tlen += taosEncodeArray(buf, pEntry->consumers, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}
L
Liu Jicong 已提交
384

385 386 387 388 389 390 391 392 393 394
void *tDecodeSMqSubActionLogEntry(const void *buf, SMqSubActionLogEntry *pEntry) {
  buf = taosDecodeFixedI32(buf, &pEntry->epoch);
  buf = taosDecodeArray(buf, &pEntry->consumers, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}

SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
  SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
  if (pLogNew == NULL) return pLogNew;
  memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
395
  pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEp);
396 397 398 399
  return pLogNew;
}

void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
400
  taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
401 402 403 404 405 406 407 408 409 410 411 412 413 414
}

int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pLog->key);
  tlen += taosEncodeArray(buf, pLog->logs, (FEncode)tEncodeSMqSubActionLogEntry);
  return tlen;
}

void *tDecodeSMqSubActionLogObj(const void *buf, SMqSubActionLogObj *pLog) {
  buf = taosDecodeStringTo(buf, pLog->key);
  buf = taosDecodeArray(buf, &pLog->logs, (FDecode)tDecodeSMqSubActionLogEntry, sizeof(SMqSubActionLogEntry));
  return (void *)buf;
}
L
Liu Jicong 已提交
415

H
Hongze Cheng 已提交
416
int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
L
Liu Jicong 已提交
417
  int32_t sz = 0;
L
Liu Jicong 已提交
418
  /*int32_t outputNameSz = 0;*/
L
Liu Jicong 已提交
419
  if (tEncodeCStr(pEncoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
420
  if (tEncodeCStr(pEncoder, pObj->sourceDb) < 0) return -1;
L
Liu Jicong 已提交
421 422 423
  if (tEncodeCStr(pEncoder, pObj->targetDb) < 0) return -1;
  if (tEncodeCStr(pEncoder, pObj->targetSTbName) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
424 425 426 427 428 429
  if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->dbUid) < 0) return -1;
  if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
  if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
L
Liu Jicong 已提交
430
  if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1;
L
fix  
Liu Jicong 已提交
431 432 433
  if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
  if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1;
L
Liu Jicong 已提交
434 435
  if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1;
  if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
L
Liu Jicong 已提交
436
  if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
437
  /*if (tEncodeCStr(pEncoder, pObj->logicalPlan) < 0) return -1;*/
L
Liu Jicong 已提交
438
  if (tEncodeCStr(pEncoder, pObj->physicalPlan) < 0) return -1;
L
Liu Jicong 已提交
439 440
  // TODO encode tasks
  if (pObj->tasks) {
L
Liu Jicong 已提交
441 442 443 444 445
    sz = taosArrayGetSize(pObj->tasks);
  }
  if (tEncodeI32(pEncoder, sz) < 0) return -1;

  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
446
    SArray *pArray = taosArrayGetP(pObj->tasks, i);
L
Liu Jicong 已提交
447 448 449
    int32_t innerSz = taosArrayGetSize(pArray);
    if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
    for (int32_t j = 0; j < innerSz; j++) {
L
Liu Jicong 已提交
450
      SStreamTask *pTask = taosArrayGetP(pArray, j);
L
Liu Jicong 已提交
451
      if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
L
Liu Jicong 已提交
452 453
    }
  }
L
Liu Jicong 已提交
454

L
Liu Jicong 已提交
455 456 457
  if (tEncodeSSchemaWrapper(pEncoder, &pObj->outputSchema) < 0) return -1;

#if 0
L
Liu Jicong 已提交
458 459
  if (pObj->ColAlias != NULL) {
    outputNameSz = taosArrayGetSize(pObj->ColAlias);
L
Liu Jicong 已提交
460 461 462
  }
  if (tEncodeI32(pEncoder, outputNameSz) < 0) return -1;
  for (int32_t i = 0; i < outputNameSz; i++) {
L
Liu Jicong 已提交
463
    char *name = taosArrayGetP(pObj->ColAlias, i);
L
Liu Jicong 已提交
464 465
    if (tEncodeCStr(pEncoder, name) < 0) return -1;
  }
L
Liu Jicong 已提交
466
#endif
L
Liu Jicong 已提交
467 468 469
  return pEncoder->pos;
}

H
Hongze Cheng 已提交
470
int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
L
Liu Jicong 已提交
471
  if (tDecodeCStrTo(pDecoder, pObj->name) < 0) return -1;
L
Liu Jicong 已提交
472
  if (tDecodeCStrTo(pDecoder, pObj->sourceDb) < 0) return -1;
L
Liu Jicong 已提交
473 474 475
  if (tDecodeCStrTo(pDecoder, pObj->targetDb) < 0) return -1;
  if (tDecodeCStrTo(pDecoder, pObj->targetSTbName) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->targetStbUid) < 0) return -1;
L
Liu Jicong 已提交
476 477 478 479 480 481
  if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->dbUid) < 0) return -1;
  if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
  if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
L
Liu Jicong 已提交
482
  if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1;
L
fix  
Liu Jicong 已提交
483 484 485
  if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
  if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1;
L
Liu Jicong 已提交
486 487
  if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1;
  if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
L
Liu Jicong 已提交
488
  if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1;
L
Liu Jicong 已提交
489
  /*if (tDecodeCStrAlloc(pDecoder, &pObj->logicalPlan) < 0) return -1;*/
L
Liu Jicong 已提交
490
  if (tDecodeCStrAlloc(pDecoder, &pObj->physicalPlan) < 0) return -1;
L
Liu Jicong 已提交
491
  pObj->tasks = NULL;
L
Liu Jicong 已提交
492 493 494
  int32_t sz;
  if (tDecodeI32(pDecoder, &sz) < 0) return -1;
  if (sz != 0) {
L
Liu Jicong 已提交
495
    pObj->tasks = taosArrayInit(sz, sizeof(void *));
L
Liu Jicong 已提交
496 497 498
    for (int32_t i = 0; i < sz; i++) {
      int32_t innerSz;
      if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
L
Liu Jicong 已提交
499
      SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
L
Liu Jicong 已提交
500
      for (int32_t j = 0; j < innerSz; j++) {
L
Liu Jicong 已提交
501 502 503 504
        SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
        if (pTask == NULL) return -1;
        if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
        taosArrayPush(pArray, &pTask);
L
Liu Jicong 已提交
505
      }
L
Liu Jicong 已提交
506
      taosArrayPush(pObj->tasks, &pArray);
L
Liu Jicong 已提交
507 508
    }
  }
L
Liu Jicong 已提交
509 510 511

  if (tDecodeSSchemaWrapper(pDecoder, &pObj->outputSchema) < 0) return -1;
#if 0
L
Liu Jicong 已提交
512 513
  int32_t outputNameSz;
  if (tDecodeI32(pDecoder, &outputNameSz) < 0) return -1;
L
Liu Jicong 已提交
514 515 516 517 518
  if (outputNameSz != 0) {
    pObj->ColAlias = taosArrayInit(outputNameSz, sizeof(void *));
    if (pObj->ColAlias == NULL) {
      return -1;
    }
L
Liu Jicong 已提交
519 520 521 522
  }
  for (int32_t i = 0; i < outputNameSz; i++) {
    char *name;
    if (tDecodeCStrAlloc(pDecoder, &name) < 0) return -1;
L
Liu Jicong 已提交
523
    taosArrayPush(pObj->ColAlias, &name);
L
Liu Jicong 已提交
524
  }
L
Liu Jicong 已提交
525
#endif
L
Liu Jicong 已提交
526 527
  return 0;
}
S
Shengliang Guan 已提交
528 529 530 531 532 533 534 535 536 537 538 539

int32_t tEncodeSMqOffsetObj(void **buf, const SMqOffsetObj *pOffset) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pOffset->key);
  tlen += taosEncodeFixedI64(buf, pOffset->offset);
  return tlen;
}

void *tDecodeSMqOffsetObj(void *buf, SMqOffsetObj *pOffset) {
  buf = taosDecodeStringTo(buf, pOffset->key);
  buf = taosDecodeFixedI64(buf, &pOffset->offset);
  return buf;
L
Liu Jicong 已提交
540
}