mndDef.h 17.4 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_MND_DEF_H_
#define _TD_MND_DEF_H_
S
Shengliang Guan 已提交
18

19
#include "os.h"
S
Shengliang Guan 已提交
20 21

#include "cJSON.h"
L
Liu Jicong 已提交
22
#include "scheduler.h"
S
Shengliang Guan 已提交
23 24
#include "sync.h"
#include "thash.h"
L
Liu Jicong 已提交
25
#include "tlist.h"
S
Shengliang Guan 已提交
26
#include "tlog.h"
L
Liu Jicong 已提交
27
#include "tmsg.h"
S
Shengliang Guan 已提交
28 29
#include "trpc.h"
#include "ttimer.h"
S
Shengliang Guan 已提交
30

S
Shengliang Guan 已提交
31
#include "mnode.h"
S
Shengliang Guan 已提交
32

S
Shengliang Guan 已提交
33 34 35
#ifdef __cplusplus
extern "C" {
#endif
S
Shengliang Guan 已提交
36

S
Shengliang Guan 已提交
37 38 39
extern int32_t mDebugFlag;

// mnode log function
L
Liu Jicong 已提交
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
#define mFatal(...)                                 \
  {                                                 \
    if (mDebugFlag & DEBUG_FATAL) {                 \
      taosPrintLog("MND FATAL ", 255, __VA_ARGS__); \
    }                                               \
  }
#define mError(...)                                 \
  {                                                 \
    if (mDebugFlag & DEBUG_ERROR) {                 \
      taosPrintLog("MND ERROR ", 255, __VA_ARGS__); \
    }                                               \
  }
#define mWarn(...)                                 \
  {                                                \
    if (mDebugFlag & DEBUG_WARN) {                 \
      taosPrintLog("MND WARN ", 255, __VA_ARGS__); \
    }                                              \
  }
#define mInfo(...)                            \
  {                                           \
    if (mDebugFlag & DEBUG_INFO) {            \
      taosPrintLog("MND ", 255, __VA_ARGS__); \
    }                                         \
  }
#define mDebug(...)                                  \
  {                                                  \
    if (mDebugFlag & DEBUG_DEBUG) {                  \
      taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); \
    }                                                \
  }
#define mTrace(...)                                  \
  {                                                  \
    if (mDebugFlag & DEBUG_TRACE) {                  \
      taosPrintLog("MND ", mDebugFlag, __VA_ARGS__); \
    }                                                \
  }
S
Shengliang Guan 已提交
76 77

typedef enum {
78 79 80 81 82 83 84 85
  MND_AUTH_ACCT_START = 0,
  MND_AUTH_ACCT_USER,
  MND_AUTH_ACCT_DNODE,
  MND_AUTH_ACCT_MNODE,
  MND_AUTH_ACCT_DB,
  MND_AUTH_ACCT_TABLE,
  MND_AUTH_ACCT_MAX
} EAuthAcct;
S
Shengliang Guan 已提交
86 87

typedef enum {
88 89 90 91 92 93
  MND_AUTH_OP_START = 0,
  MND_AUTH_OP_CREATE_USER,
  MND_AUTH_OP_ALTER_USER,
  MND_AUTH_OP_DROP_USER,
  MND_AUTH_MAX
} EAuthOp;
S
Shengliang Guan 已提交
94

S
Shengliang Guan 已提交
95
typedef enum {
96
  TRN_STAGE_PREPARE = 0,
S
Shengliang Guan 已提交
97 98
  TRN_STAGE_REDO_LOG = 1,
  TRN_STAGE_REDO_ACTION = 2,
99 100
  TRN_STAGE_COMMIT = 3,
  TRN_STAGE_COMMIT_LOG = 4,
101 102
  TRN_STAGE_UNDO_ACTION = 5,
  TRN_STAGE_UNDO_LOG = 6,
S
Shengliang Guan 已提交
103 104
  TRN_STAGE_ROLLBACK = 7,
  TRN_STAGE_FINISHED = 8
S
Shengliang Guan 已提交
105 106
} ETrnStage;

S
Shengliang Guan 已提交
107 108 109 110
typedef enum {
  TRN_TYPE_CREATE_DB = 0,
} ETrnType;

111
typedef enum { TRN_POLICY_ROLLBACK = 0, TRN_POLICY_RETRY = 1 } ETrnPolicy;
S
Shengliang Guan 已提交
112

S
Shengliang Guan 已提交
113 114 115 116 117 118 119 120 121 122 123 124 125 126
typedef enum {
  DND_REASON_ONLINE = 0,
  DND_REASON_STATUS_MSG_TIMEOUT,
  DND_REASON_STATUS_NOT_RECEIVED,
  DND_REASON_VERSION_NOT_MATCH,
  DND_REASON_DNODE_ID_NOT_MATCH,
  DND_REASON_CLUSTER_ID_NOT_MATCH,
  DND_REASON_STATUS_INTERVAL_NOT_MATCH,
  DND_REASON_TIME_ZONE_NOT_MATCH,
  DND_REASON_LOCALE_NOT_MATCH,
  DND_REASON_CHARSET_NOT_MATCH,
  DND_REASON_OTHERS
} EDndReason;

S
Shengliang Guan 已提交
127
typedef struct {
S
Shengliang Guan 已提交
128 129 130
  int32_t    id;
  ETrnStage  stage;
  ETrnPolicy policy;
S
Shengliang Guan 已提交
131 132
  int32_t    code;
  int32_t    failedTimes;
L
Liu Jicong 已提交
133 134
  void*      rpcHandle;
  void*      rpcAHandle;
S
Shengliang Guan 已提交
135 136
  void*      rpcRsp;
  int32_t    rpcRspLen;
L
Liu Jicong 已提交
137 138 139 140 141
  SArray*    redoLogs;
  SArray*    undoLogs;
  SArray*    commitLogs;
  SArray*    redoActions;
  SArray*    undoActions;
S
Shengliang Guan 已提交
142 143 144 145
  int64_t    createdTime;
  int64_t    lastExecTime;
  int32_t    transType;
  uint64_t   dbUid;
S
Shengliang Guan 已提交
146
  char       dbname[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
147
  char       lastError[TSDB_TRANS_DESC_LEN];
S
Shengliang Guan 已提交
148
} STrans;
S
Shengliang Guan 已提交
149

S
Shengliang Guan 已提交
150
typedef struct {
151
  int64_t id;
S
Shengliang Guan 已提交
152
  char    name[TSDB_CLUSTER_ID_LEN];
S
Shengliang Guan 已提交
153 154 155 156
  int64_t createdTime;
  int64_t updateTime;
} SClusterObj;

S
Shengliang Guan 已提交
157
typedef struct {
S
Shengliang Guan 已提交
158 159 160 161
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
  int64_t    rebootTime;
162
  int64_t    lastAccessTime;
S
Shengliang Guan 已提交
163
  int32_t    accessTimes;
S
Shengliang Guan 已提交
164
  int32_t    numOfVnodes;
S
Shengliang Guan 已提交
165 166
  int32_t    numOfSupportVnodes;
  int32_t    numOfCores;
S
Shengliang Guan 已提交
167 168 169 170
  EDndReason offlineReason;
  uint16_t   port;
  char       fqdn[TSDB_FQDN_LEN];
  char       ep[TSDB_EP_LEN];
S
Shengliang Guan 已提交
171 172
} SDnodeObj;

S
Shengliang Guan 已提交
173
typedef struct {
S
Shengliang Guan 已提交
174
  int32_t    id;
S
Shengliang Guan 已提交
175 176
  int64_t    createdTime;
  int64_t    updateTime;
S
Shengliang Guan 已提交
177
  ESyncState role;
S
Shengliang Guan 已提交
178 179
  int32_t    roleTerm;
  int64_t    roleTime;
L
Liu Jicong 已提交
180
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
181 182
} SMnodeObj;

S
Shengliang Guan 已提交
183 184 185 186
typedef struct {
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
L
Liu Jicong 已提交
187
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
188 189 190 191 192 193
} SQnodeObj;

typedef struct {
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
L
Liu Jicong 已提交
194
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
195 196 197 198 199 200
} SSnodeObj;

typedef struct {
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
L
Liu Jicong 已提交
201
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
202 203
} SBnodeObj;

S
Shengliang Guan 已提交
204 205 206
typedef struct {
  int32_t maxUsers;
  int32_t maxDbs;
S
Shengliang Guan 已提交
207 208
  int32_t maxStbs;
  int32_t maxTbs;
S
Shengliang Guan 已提交
209 210
  int32_t maxTimeSeries;
  int32_t maxStreams;
S
Shengliang Guan 已提交
211 212 213 214
  int32_t maxFuncs;
  int32_t maxConsumers;
  int32_t maxConns;
  int32_t maxTopics;
S
Shengliang Guan 已提交
215 216
  int64_t maxStorage;   // In unit of GB
  int32_t accessState;  // Configured only by command
S
Shengliang Guan 已提交
217 218 219 220 221 222 223
} SAcctCfg;

typedef struct {
  int32_t numOfUsers;
  int32_t numOfDbs;
  int32_t numOfTimeSeries;
  int32_t numOfStreams;
S
Shengliang Guan 已提交
224 225
  int64_t totalStorage;  // Total storage wrtten from this account
  int64_t compStorage;   // Compressed storage on disk
S
Shengliang Guan 已提交
226 227
} SAcctInfo;

S
Shengliang Guan 已提交
228
typedef struct {
S
Shengliang Guan 已提交
229 230 231 232
  char      acct[TSDB_USER_LEN];
  int64_t   createdTime;
  int64_t   updateTime;
  int32_t   acctId;
S
Shengliang Guan 已提交
233
  int32_t   status;
S
Shengliang Guan 已提交
234 235 236 237
  SAcctCfg  cfg;
  SAcctInfo info;
} SAcctObj;

S
Shengliang Guan 已提交
238
typedef struct {
S
Shengliang Guan 已提交
239
  char      user[TSDB_USER_LEN];
240
  char      pass[TSDB_PASSWORD_LEN];
S
Shengliang Guan 已提交
241 242 243
  char      acct[TSDB_USER_LEN];
  int64_t   createdTime;
  int64_t   updateTime;
244
  int8_t    superUser;
S
Shengliang Guan 已提交
245
  int32_t   acctId;
246 247
  SHashObj* readDbs;
  SHashObj* writeDbs;
S
Shengliang Guan 已提交
248 249 250
} SUserObj;

typedef struct {
251
  int32_t numOfVgroups;
S
Shengliang Guan 已提交
252 253 254 255 256 257
  int32_t cacheBlockSize;
  int32_t totalBlocks;
  int32_t daysPerFile;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
S
Shengliang Guan 已提交
258 259
  int32_t minRows;
  int32_t maxRows;
S
Shengliang Guan 已提交
260 261
  int32_t commitTime;
  int32_t fsyncPeriod;
S
Shengliang Guan 已提交
262
  int8_t  walLevel;
S
Shengliang Guan 已提交
263 264 265 266 267 268 269 270
  int8_t  precision;
  int8_t  compression;
  int8_t  replications;
  int8_t  quorum;
  int8_t  update;
  int8_t  cacheLastRow;
} SDbCfg;

S
Shengliang Guan 已提交
271
typedef struct {
L
Liu Jicong 已提交
272 273
  char     name[TSDB_DB_FNAME_LEN];
  char     acct[TSDB_USER_LEN];
S
Shengliang Guan 已提交
274
  char     createUser[TSDB_USER_LEN];
L
Liu Jicong 已提交
275 276
  int64_t  createdTime;
  int64_t  updateTime;
H
Haojun Liao 已提交
277
  uint64_t uid;
L
Liu Jicong 已提交
278 279 280 281
  int32_t  cfgVersion;
  int32_t  vgVersion;
  int8_t   hashMethod;  // default is 1
  SDbCfg   cfg;
S
Shengliang Guan 已提交
282 283 284 285
} SDbObj;

typedef struct {
  int32_t    dnodeId;
S
Shengliang Guan 已提交
286
  ESyncState role;
S
Shengliang Guan 已提交
287 288
} SVnodeGid;

S
Shengliang Guan 已提交
289
typedef struct {
S
Shengliang Guan 已提交
290
  int32_t   vgId;
S
Shengliang Guan 已提交
291 292
  int64_t   createdTime;
  int64_t   updateTime;
S
Shengliang Guan 已提交
293
  int32_t   version;
S
Shengliang Guan 已提交
294 295
  uint32_t  hashBegin;
  uint32_t  hashEnd;
296
  char      dbName[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
297
  int64_t   dbUid;
S
Shengliang Guan 已提交
298 299
  int64_t   numOfTables;
  int64_t   numOfTimeSeries;
S
Shengliang Guan 已提交
300 301 302
  int64_t   totalStorage;
  int64_t   compStorage;
  int64_t   pointsWritten;
S
Shengliang Guan 已提交
303 304 305
  int8_t    compact;
  int8_t    replica;
  SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
306 307
} SVgObj;

S
Shengliang Guan 已提交
308
typedef struct {
S
Shengliang Guan 已提交
309
  char     name[TSDB_TABLE_FNAME_LEN];
310
  char     db[TSDB_DB_FNAME_LEN];
311 312
  int64_t  createdTime;
  int64_t  updateTime;
S
Shengliang Guan 已提交
313
  uint64_t uid;
S
Shengliang Guan 已提交
314
  uint64_t dbUid;
S
Shengliang Guan 已提交
315
  int32_t  version;
S
Shengliang Guan 已提交
316
  int32_t  nextColId;
S
Shengliang Guan 已提交
317 318
  int32_t  numOfColumns;
  int32_t  numOfTags;
S
Shengliang Guan 已提交
319
  SSchema* pColumns;
S
Shengliang Guan 已提交
320
  SSchema* pTags;
S
Shengliang Guan 已提交
321
  SRWLatch lock;
S
Shengliang Guan 已提交
322
  char     comment[TSDB_STB_COMMENT_LEN];
S
Shengliang Guan 已提交
323
} SStbObj;
S
Shengliang Guan 已提交
324

S
Shengliang Guan 已提交
325
typedef struct {
S
Shengliang Guan 已提交
326 327
  char    name[TSDB_FUNC_NAME_LEN];
  int64_t createdTime;
S
Shengliang Guan 已提交
328 329 330 331 332 333
  int8_t  funcType;
  int8_t  scriptType;
  int8_t  align;
  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
S
Shengliang 已提交
334
  int64_t signature;
S
Shengliang Guan 已提交
335 336
  int32_t commentSize;
  int32_t codeSize;
L
Liu Jicong 已提交
337 338
  char*   pComment;
  char*   pCode;
S
Shengliang Guan 已提交
339
  char    pData[];
S
Shengliang Guan 已提交
340 341
} SFuncObj;

S
Shengliang Guan 已提交
342
typedef struct {
343
  int64_t id;
S
Shengliang Guan 已提交
344 345 346 347 348 349 350
  int8_t  type;
  int8_t  replica;
  int16_t numOfColumns;
  int32_t rowSize;
  int32_t numOfRows;
  int32_t numOfReads;
  int32_t payloadLen;
L
Liu Jicong 已提交
351 352
  void*   pIter;
  SMnode* pMnode;
353
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
354 355 356
  int16_t offset[TSDB_MAX_COLUMNS];
  int32_t bytes[TSDB_MAX_COLUMNS];
  char    payload[];
S
Shengliang Guan 已提交
357 358
} SShowObj;

L
Liu Jicong 已提交
359 360 361 362 363 364 365
typedef struct {
  int32_t vgId;  // -1 for unassigned
  int32_t status;
  SEpSet  epSet;
  int64_t oldConsumerId;
  int64_t consumerId;  // -1 for unassigned
  char*   qmsg;
L
Liu Jicong 已提交
366 367
} SMqConsumerEp;

L
Liu Jicong 已提交
368
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pConsumerEp) {
L
Liu Jicong 已提交
369 370
  int32_t tlen = 0;
  tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
L
Liu Jicong 已提交
371
  tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
372
  tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
L
Liu Jicong 已提交
373
  tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId);
L
Liu Jicong 已提交
374
  tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
L
Liu Jicong 已提交
375
  tlen += taosEncodeString(buf, pConsumerEp->qmsg);
L
Liu Jicong 已提交
376 377 378 379 380
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
  buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
L
Liu Jicong 已提交
381
  buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
382
  buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
L
Liu Jicong 已提交
383
  buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId);
L
Liu Jicong 已提交
384
  buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
L
Liu Jicong 已提交
385
  buf = taosDecodeString(buf, &pConsumerEp->qmsg);
L
Liu Jicong 已提交
386 387 388
  return buf;
}

L
Liu Jicong 已提交
389 390 391 392 393 394
static FORCE_INLINE void tDeleteSMqConsumerEp(SMqConsumerEp* pConsumerEp) {
  if (pConsumerEp) {
    tfree(pConsumerEp->qmsg);
  }
}

L
Liu Jicong 已提交
395 396 397 398
typedef struct {
  int64_t consumerId;
  SArray* vgInfo;  // SArray<SMqConsumerEp>
} SMqSubConsumer;
L
Liu Jicong 已提交
399

L
Liu Jicong 已提交
400 401 402 403
static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsumer* pConsumer) {
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
  int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
L
Liu Jicong 已提交
404
  tlen += taosEncodeFixedI32(buf, sz);
L
Liu Jicong 已提交
405 406 407
  for (int32_t i = 0; i < sz; i++) {
    SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
    tlen += tEncodeSMqConsumerEp(buf, pCEp);
L
Liu Jicong 已提交
408
  }
L
Liu Jicong 已提交
409 410
  return tlen;
}
L
Liu Jicong 已提交
411

L
Liu Jicong 已提交
412 413 414 415 416 417 418 419 420
static FORCE_INLINE void* tDecodeSMqSubConsumer(void** buf, SMqSubConsumer* pConsumer) {
  int32_t sz;
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->vgInfo = taosArrayInit(sz, sizeof(SMqConsumerEp));
  for (int32_t i = 0; i < sz; i++) {
    SMqConsumerEp consumerEp;
    buf = tDecodeSMqConsumerEp(buf, &consumerEp);
    taosArrayPush(pConsumer->vgInfo, &consumerEp);
L
Liu Jicong 已提交
421
  }
L
Liu Jicong 已提交
422 423 424 425 426 427 428
  return buf;
}

static FORCE_INLINE void tDeleteSMqSubConsumer(SMqSubConsumer* pSubConsumer) {
  if (pSubConsumer->vgInfo) {
    taosArrayDestroyEx(pSubConsumer->vgInfo, (void (*)(void*))tDeleteSMqConsumerEp);
    pSubConsumer->vgInfo = NULL;
L
Liu Jicong 已提交
429
  }
L
Liu Jicong 已提交
430 431 432 433 434 435 436
}

typedef struct {
  char    key[TSDB_SUBSCRIBE_KEY_LEN];
  int32_t status;
  int32_t vgNum;
  SArray* consumers;     // SArray<SMqSubConsumer>
L
Liu Jicong 已提交
437
  SArray* lostConsumers; // SArray<SMqSubConsumer>
L
Liu Jicong 已提交
438 439 440 441 442 443
  SArray* unassignedVg;  // SArray<SMqConsumerEp>
} SMqSubscribeObj;

static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
  SMqSubscribeObj* pSub = calloc(1, sizeof(SMqSubscribeObj));
  if (pSub == NULL) {
L
Liu Jicong 已提交
444 445
    return NULL;
  }
L
Liu Jicong 已提交
446

L
Liu Jicong 已提交
447 448 449
  pSub->consumers = taosArrayInit(0, sizeof(SMqSubConsumer));
  if (pSub->consumers == NULL) {
    goto _err;
L
Liu Jicong 已提交
450
  }
L
Liu Jicong 已提交
451 452 453 454 455 456

  pSub->lostConsumers = taosArrayInit(0, sizeof(SMqSubConsumer));
  if (pSub->lostConsumers == NULL) {
    goto _err;
  }

L
Liu Jicong 已提交
457
  pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
L
Liu Jicong 已提交
458
  if (pSub->unassignedVg == NULL) {
L
Liu Jicong 已提交
459
    goto _err;
L
Liu Jicong 已提交
460
  }
L
Liu Jicong 已提交
461 462 463 464 465

  pSub->key[0] = 0;
  pSub->vgNum = 0;
  pSub->status = 0;

L
Liu Jicong 已提交
466
  return pSub;
L
Liu Jicong 已提交
467 468 469

_err:
  tfree(pSub->consumers);
L
Liu Jicong 已提交
470 471
  tfree(pSub->lostConsumers);
  tfree(pSub->unassignedVg);
L
Liu Jicong 已提交
472 473
  tfree(pSub);
  return NULL;
L
Liu Jicong 已提交
474 475 476 477 478
}

static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
  int32_t tlen = 0;
  tlen += taosEncodeString(buf, pSub->key);
L
Liu Jicong 已提交
479 480
  tlen += taosEncodeFixedI32(buf, pSub->vgNum);
  tlen += taosEncodeFixedI32(buf, pSub->status);
L
Liu Jicong 已提交
481 482
  int32_t sz;

L
Liu Jicong 已提交
483
  sz = taosArrayGetSize(pSub->consumers);
L
Liu Jicong 已提交
484 485
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
486 487
    SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->consumers, i);
    tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
L
Liu Jicong 已提交
488 489
  }

L
Liu Jicong 已提交
490 491 492 493 494 495 496
  sz = taosArrayGetSize(pSub->lostConsumers);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqSubConsumer* pSubConsumer = taosArrayGet(pSub->lostConsumers, i);
    tlen += tEncodeSMqSubConsumer(buf, pSubConsumer);
  }

L
Liu Jicong 已提交
497 498 499 500 501 502 503 504 505 506 507 508
  sz = taosArrayGetSize(pSub->unassignedVg);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i);
    tlen += tEncodeSMqConsumerEp(buf, pCEp);
  }

  return tlen;
}

static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) {
  buf = taosDecodeStringTo(buf, pSub->key);
L
Liu Jicong 已提交
509 510
  buf = taosDecodeFixedI32(buf, &pSub->vgNum);
  buf = taosDecodeFixedI32(buf, &pSub->status);
L
Liu Jicong 已提交
511 512 513 514

  int32_t sz;

  buf = taosDecodeFixedI32(buf, &sz);
L
Liu Jicong 已提交
515 516
  pSub->consumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
  if (pSub->consumers == NULL) {
L
Liu Jicong 已提交
517 518 519
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
520 521 522
    SMqSubConsumer subConsumer = {0};
    buf = tDecodeSMqSubConsumer(buf, &subConsumer);
    taosArrayPush(pSub->consumers, &subConsumer);
L
Liu Jicong 已提交
523 524
  }

L
Liu Jicong 已提交
525 526 527 528 529 530 531 532 533 534 535
  buf = taosDecodeFixedI32(buf, &sz);
  pSub->lostConsumers = taosArrayInit(sz, sizeof(SMqSubConsumer));
  if (pSub->lostConsumers == NULL) {
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
    SMqSubConsumer subConsumer = {0};
    buf = tDecodeSMqSubConsumer(buf, &subConsumer);
    taosArrayPush(pSub->lostConsumers, &subConsumer);
  }

L
Liu Jicong 已提交
536 537 538 539 540 541
  buf = taosDecodeFixedI32(buf, &sz);
  pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
  if (pSub->unassignedVg == NULL) {
    return NULL;
  }
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
542 543 544
    SMqConsumerEp consumerEp = {0};
    buf = tDecodeSMqConsumerEp(buf, &consumerEp);
    taosArrayPush(pSub->unassignedVg, &consumerEp);
L
Liu Jicong 已提交
545 546 547
  }
  return buf;
}
L
Liu Jicong 已提交
548

L
Liu Jicong 已提交
549
static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
L
Liu Jicong 已提交
550 551 552 553
  if (pSub->consumers) {
    taosArrayDestroyEx(pSub->consumers, (void (*)(void*))tDeleteSMqSubConsumer);
    //taosArrayDestroy(pSub->consumers);
    pSub->consumers = NULL;
L
Liu Jicong 已提交
554
  }
L
Liu Jicong 已提交
555

L
Liu Jicong 已提交
556
  if (pSub->unassignedVg) {
L
Liu Jicong 已提交
557 558
    taosArrayDestroyEx(pSub->unassignedVg, (void (*)(void*))tDeleteSMqConsumerEp);
    //taosArrayDestroy(pSub->unassignedVg);
L
Liu Jicong 已提交
559 560
    pSub->unassignedVg = NULL;
  }
L
Liu Jicong 已提交
561 562
}

L
Liu Jicong 已提交
563
typedef struct {
L
Liu Jicong 已提交
564 565 566 567
  char     name[TSDB_TOPIC_FNAME_LEN];
  char     db[TSDB_DB_FNAME_LEN];
  int64_t  createTime;
  int64_t  updateTime;
L
Liu Jicong 已提交
568
  int64_t  uid;
L
Liu Jicong 已提交
569
  int64_t  dbUid;
L
Liu Jicong 已提交
570 571 572 573 574 575
  int32_t  version;
  SRWLatch lock;
  int32_t  sqlLen;
  char*    sql;
  char*    logicalPlan;
  char*    physicalPlan;
L
Liu Jicong 已提交
576 577
} SMqTopicObj;

L
Liu Jicong 已提交
578
typedef struct {
L
Liu Jicong 已提交
579
  int64_t  consumerId;
L
Liu Jicong 已提交
580
  int64_t  connId;
L
Liu Jicong 已提交
581
  SRWLatch lock;
L
Liu Jicong 已提交
582
  char     cgroup[TSDB_CONSUMER_GROUP_LEN];
L
Liu Jicong 已提交
583 584
  SArray*  currentTopics;  // SArray<char*>
  SArray*  recentRemovedTopics;   // SArray<char*>
L
Liu Jicong 已提交
585
  int32_t  epoch;
L
Liu Jicong 已提交
586
  // stat
L
Liu Jicong 已提交
587 588 589 590 591 592 593
  int64_t pollCnt;
  // status
  int32_t status;
  // heartbeat from the consumer reset hbStatus to 0
  // each checkConsumerAlive msg add hbStatus by 1
  // if checkConsumerAlive > CONSUMER_REBALANCE_CNT, mask to lost
  int32_t hbStatus;
L
Liu Jicong 已提交
594 595
} SMqConsumerObj;

L
Liu Jicong 已提交
596
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
L
Liu Jicong 已提交
597
  int32_t sz;
L
Liu Jicong 已提交
598 599
  int32_t tlen = 0;
  tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
L
Liu Jicong 已提交
600
  tlen += taosEncodeFixedI64(buf, pConsumer->connId);
L
Liu Jicong 已提交
601
  tlen += taosEncodeFixedI32(buf, pConsumer->epoch);
L
Liu Jicong 已提交
602
  tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
L
Liu Jicong 已提交
603
  tlen += taosEncodeFixedI32(buf, pConsumer->status);
L
Liu Jicong 已提交
604
  tlen += taosEncodeString(buf, pConsumer->cgroup);
L
Liu Jicong 已提交
605 606 607 608 609 610 611 612 613

  sz = taosArrayGetSize(pConsumer->currentTopics);
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
    char* topic = taosArrayGetP(pConsumer->currentTopics, i);
    tlen += taosEncodeString(buf, topic);
  }

  sz = taosArrayGetSize(pConsumer->recentRemovedTopics);
L
Liu Jicong 已提交
614 615
  tlen += taosEncodeFixedI32(buf, sz);
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
616
    char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i);
L
Liu Jicong 已提交
617
    tlen += taosEncodeString(buf, topic);
L
Liu Jicong 已提交
618 619 620 621 622
  }
  return tlen;
}

static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) {
L
Liu Jicong 已提交
623
  int32_t sz;
L
Liu Jicong 已提交
624
  buf = taosDecodeFixedI64(buf, &pConsumer->consumerId);
L
Liu Jicong 已提交
625
  buf = taosDecodeFixedI64(buf, &pConsumer->connId);
L
Liu Jicong 已提交
626
  buf = taosDecodeFixedI32(buf, &pConsumer->epoch);
L
Liu Jicong 已提交
627
  buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
L
Liu Jicong 已提交
628
  buf = taosDecodeFixedI32(buf, &pConsumer->status);
L
Liu Jicong 已提交
629
  buf = taosDecodeStringTo(buf, pConsumer->cgroup);
L
Liu Jicong 已提交
630 631 632 633 634 635 636 637 638

  buf = taosDecodeFixedI32(buf, &sz);
  pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
  for (int32_t i = 0; i < sz; i++) {
    char* topic;
    buf = taosDecodeString(buf, &topic);
    taosArrayPush(pConsumer->currentTopics, &topic);
  }

L
Liu Jicong 已提交
639
  buf = taosDecodeFixedI32(buf, &sz);
L
Liu Jicong 已提交
640
  pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
L
Liu Jicong 已提交
641
  for (int32_t i = 0; i < sz; i++) {
L
Liu Jicong 已提交
642 643
    char* topic;
    buf = taosDecodeString(buf, &topic);
L
Liu Jicong 已提交
644
    taosArrayPush(pConsumer->recentRemovedTopics, &topic);
L
Liu Jicong 已提交
645 646 647 648
  }
  return buf;
}

S
Shengliang Guan 已提交
649
typedef struct SMnodeMsg {
S
Shengliang Guan 已提交
650
  char    user[TSDB_USER_LEN];
651
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
652
  int32_t acctId;
L
Liu Jicong 已提交
653
  SMnode* pMnode;
S
Shengliang Guan 已提交
654 655 656
  int64_t createdTime;
  SRpcMsg rpcMsg;
  int32_t contLen;
L
Liu Jicong 已提交
657
  void*   pCont;
S
Shengliang Guan 已提交
658
} SMnodeMsg;
S
Shengliang Guan 已提交
659

S
Shengliang Guan 已提交
660 661 662
#ifdef __cplusplus
}
#endif
S
Shengliang Guan 已提交
663

S
Shengliang Guan 已提交
664
#endif /*_TD_MND_DEF_H_*/