mndDef.h 15.3 KB
Newer Older
H
refact  
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

S
Shengliang Guan 已提交
16 17
#ifndef _TD_MND_DEF_H_
#define _TD_MND_DEF_H_
S
Shengliang Guan 已提交
18

19
#include "os.h"
S
Shengliang Guan 已提交
20 21

#include "cJSON.h"
L
Liu Jicong 已提交
22
#include "scheduler.h"
S
Shengliang Guan 已提交
23 24
#include "sync.h"
#include "thash.h"
L
Liu Jicong 已提交
25
#include "tlist.h"
S
Shengliang Guan 已提交
26
#include "tlog.h"
L
Liu Jicong 已提交
27
#include "tmsg.h"
S
Shengliang Guan 已提交
28
#include "trpc.h"
L
Liu Jicong 已提交
29
#include "tstream.h"
S
Shengliang Guan 已提交
30
#include "ttimer.h"
S
Shengliang Guan 已提交
31

S
Shengliang Guan 已提交
32
#include "mnode.h"
S
Shengliang Guan 已提交
33

S
Shengliang Guan 已提交
34 35 36
#ifdef __cplusplus
extern "C" {
#endif
S
Shengliang Guan 已提交
37

S
Shengliang Guan 已提交
38
typedef enum {
39 40 41 42 43 44 45 46
  MND_AUTH_ACCT_START = 0,
  MND_AUTH_ACCT_USER,
  MND_AUTH_ACCT_DNODE,
  MND_AUTH_ACCT_MNODE,
  MND_AUTH_ACCT_DB,
  MND_AUTH_ACCT_TABLE,
  MND_AUTH_ACCT_MAX
} EAuthAcct;
S
Shengliang Guan 已提交
47 48

typedef enum {
49 50 51 52 53 54
  MND_AUTH_OP_START = 0,
  MND_AUTH_OP_CREATE_USER,
  MND_AUTH_OP_ALTER_USER,
  MND_AUTH_OP_DROP_USER,
  MND_AUTH_MAX
} EAuthOp;
S
Shengliang Guan 已提交
55

S
Shengliang Guan 已提交
56 57 58 59 60
typedef enum {
  TRN_STEP_LOG = 1,
  TRN_STEP_ACTION = 2,
} ETrnStep;

S
Shengliang Guan 已提交
61
typedef enum {
62
  TRN_STAGE_PREPARE = 0,
S
Shengliang Guan 已提交
63 64
  TRN_STAGE_REDO_LOG = 1,
  TRN_STAGE_REDO_ACTION = 2,
65 66 67 68 69
  TRN_STAGE_ROLLBACK = 3,
  TRN_STAGE_UNDO_ACTION = 4,
  TRN_STAGE_UNDO_LOG = 5,
  TRN_STAGE_COMMIT = 6,
  TRN_STAGE_COMMIT_LOG = 7,
S
Shengliang Guan 已提交
70
  TRN_STAGE_FINISHED = 8
S
Shengliang Guan 已提交
71 72
} ETrnStage;

S
Shengliang Guan 已提交
73
typedef enum {
S
Shengliang Guan 已提交
74
  TRN_TYPE_BASIC_SCOPE = 1000,
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
  TRN_TYPE_CREATE_ACCT = 1001,
  TRN_TYPE_CREATE_CLUSTER = 1002,
  TRN_TYPE_CREATE_USER = 1003,
  TRN_TYPE_ALTER_USER = 1004,
  TRN_TYPE_DROP_USER = 1005,
  TRN_TYPE_CREATE_FUNC = 1006,
  TRN_TYPE_DROP_FUNC = 1007,

  TRN_TYPE_CREATE_SNODE = 1010,
  TRN_TYPE_DROP_SNODE = 1011,
  TRN_TYPE_CREATE_QNODE = 1012,
  TRN_TYPE_DROP_QNODE = 10013,
  TRN_TYPE_CREATE_BNODE = 1014,
  TRN_TYPE_DROP_BNODE = 1015,
  TRN_TYPE_CREATE_MNODE = 1016,
  TRN_TYPE_DROP_MNODE = 1017,

  TRN_TYPE_CREATE_TOPIC = 1020,
  TRN_TYPE_DROP_TOPIC = 1021,
  TRN_TYPE_SUBSCRIBE = 1022,
  TRN_TYPE_REBALANCE = 1023,
  TRN_TYPE_COMMIT_OFFSET = 1024,
  TRN_TYPE_CREATE_STREAM = 1025,
  TRN_TYPE_DROP_STREAM = 1026,
  TRN_TYPE_ALTER_STREAM = 1027,
  TRN_TYPE_CONSUMER_LOST = 1028,
  TRN_TYPE_CONSUMER_RECOVER = 1029,
L
Liu Jicong 已提交
102
  TRN_TYPE_DROP_CGROUP = 1030,
S
Shengliang Guan 已提交
103
  TRN_TYPE_BASIC_SCOPE_END,
104

S
Shengliang Guan 已提交
105 106 107 108
  TRN_TYPE_GLOBAL_SCOPE = 2000,
  TRN_TYPE_CREATE_DNODE = 2001,
  TRN_TYPE_DROP_DNODE = 2002,
  TRN_TYPE_GLOBAL_SCOPE_END,
109

S
Shengliang Guan 已提交
110 111 112 113
  TRN_TYPE_DB_SCOPE = 3000,
  TRN_TYPE_CREATE_DB = 3001,
  TRN_TYPE_ALTER_DB = 3002,
  TRN_TYPE_DROP_DB = 3003,
S
Shengliang Guan 已提交
114 115
  TRN_TYPE_SPLIT_VGROUP = 3004,
  TRN_TYPE_MERGE_VGROUP = 3015,
S
Shengliang Guan 已提交
116
  TRN_TYPE_DB_SCOPE_END,
117

S
Shengliang Guan 已提交
118 119 120 121
  TRN_TYPE_STB_SCOPE = 4000,
  TRN_TYPE_CREATE_STB = 4001,
  TRN_TYPE_ALTER_STB = 4002,
  TRN_TYPE_DROP_STB = 4003,
S
Shengliang Guan 已提交
122 123
  TRN_TYPE_CREATE_SMA = 4004,
  TRN_TYPE_DROP_SMA = 4005,
S
Shengliang Guan 已提交
124
  TRN_TYPE_STB_SCOPE_END,
S
Shengliang Guan 已提交
125 126
} ETrnType;

S
Shengliang Guan 已提交
127 128 129 130
typedef enum {
  TRN_POLICY_ROLLBACK = 0,
  TRN_POLICY_RETRY = 1,
} ETrnPolicy;
S
Shengliang Guan 已提交
131

132 133 134 135 136
typedef enum {
  TRN_EXEC_PARALLEL = 0,
  TRN_EXEC_ONE_BY_ONE = 1,
} ETrnExecType;

S
Shengliang Guan 已提交
137 138 139 140 141 142 143 144 145 146 147 148 149 150
typedef enum {
  DND_REASON_ONLINE = 0,
  DND_REASON_STATUS_MSG_TIMEOUT,
  DND_REASON_STATUS_NOT_RECEIVED,
  DND_REASON_VERSION_NOT_MATCH,
  DND_REASON_DNODE_ID_NOT_MATCH,
  DND_REASON_CLUSTER_ID_NOT_MATCH,
  DND_REASON_STATUS_INTERVAL_NOT_MATCH,
  DND_REASON_TIME_ZONE_NOT_MATCH,
  DND_REASON_LOCALE_NOT_MATCH,
  DND_REASON_CHARSET_NOT_MATCH,
  DND_REASON_OTHERS
} EDndReason;

S
Shengliang Guan 已提交
151 152 153 154 155 156 157 158 159
typedef enum {
  CONSUMER_UPDATE__TOUCH = 1,
  CONSUMER_UPDATE__ADD,
  CONSUMER_UPDATE__REMOVE,
  CONSUMER_UPDATE__LOST,
  CONSUMER_UPDATE__RECOVER,
  CONSUMER_UPDATE__MODIFY,
} ECsmUpdateType;

S
Shengliang Guan 已提交
160
typedef struct {
S
Shengliang Guan 已提交
161 162 163 164
  int32_t        id;
  ETrnStage      stage;
  ETrnPolicy     policy;
  ETrnType       type;
165
  ETrnExecType   parallel;
S
Shengliang Guan 已提交
166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  int32_t        code;
  int32_t        failedTimes;
  SRpcHandleInfo rpcInfo;
  void*          rpcRsp;
  int32_t        rpcRspLen;
  SArray*        redoLogs;
  SArray*        undoLogs;
  SArray*        commitLogs;
  SArray*        redoActions;
  SArray*        undoActions;
  int64_t        createdTime;
  int64_t        lastExecTime;
  int64_t        dbUid;
  char           dbname[TSDB_DB_FNAME_LEN];
  char           lastError[TSDB_TRANS_ERROR_LEN];
  int32_t        startFunc;
  int32_t        stopFunc;
  int32_t        paramLen;
  void*          param;
S
Shengliang Guan 已提交
185
} STrans;
S
Shengliang Guan 已提交
186

S
Shengliang Guan 已提交
187
typedef struct {
188
  int64_t id;
S
Shengliang Guan 已提交
189
  char    name[TSDB_CLUSTER_ID_LEN];
S
Shengliang Guan 已提交
190 191 192 193
  int64_t createdTime;
  int64_t updateTime;
} SClusterObj;

S
Shengliang Guan 已提交
194
typedef struct {
S
Shengliang Guan 已提交
195 196 197 198
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
  int64_t    rebootTime;
199
  int64_t    lastAccessTime;
S
Shengliang Guan 已提交
200
  int32_t    accessTimes;
S
Shengliang Guan 已提交
201
  int32_t    numOfVnodes;
S
Shengliang Guan 已提交
202 203
  int32_t    numOfSupportVnodes;
  int32_t    numOfCores;
S
Shengliang Guan 已提交
204 205 206 207
  EDndReason offlineReason;
  uint16_t   port;
  char       fqdn[TSDB_FQDN_LEN];
  char       ep[TSDB_EP_LEN];
S
Shengliang Guan 已提交
208 209
} SDnodeObj;

S
Shengliang Guan 已提交
210
typedef struct {
S
Shengliang Guan 已提交
211
  int32_t    id;
S
Shengliang Guan 已提交
212 213
  int64_t    createdTime;
  int64_t    updateTime;
214 215
  ESyncState state;
  int64_t    stateStartTime;
L
Liu Jicong 已提交
216
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
217 218
} SMnodeObj;

S
Shengliang Guan 已提交
219 220 221 222
typedef struct {
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
L
Liu Jicong 已提交
223
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
224 225 226 227 228 229
} SQnodeObj;

typedef struct {
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
L
Liu Jicong 已提交
230
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
231 232 233 234 235 236
} SSnodeObj;

typedef struct {
  int32_t    id;
  int64_t    createdTime;
  int64_t    updateTime;
L
Liu Jicong 已提交
237
  SDnodeObj* pDnode;
S
Shengliang Guan 已提交
238 239
} SBnodeObj;

S
Shengliang Guan 已提交
240 241 242
typedef struct {
  int32_t maxUsers;
  int32_t maxDbs;
S
Shengliang Guan 已提交
243 244
  int32_t maxStbs;
  int32_t maxTbs;
S
Shengliang Guan 已提交
245 246
  int32_t maxTimeSeries;
  int32_t maxStreams;
S
Shengliang Guan 已提交
247 248 249 250
  int32_t maxFuncs;
  int32_t maxConsumers;
  int32_t maxConns;
  int32_t maxTopics;
251
  int64_t maxStorage;
S
Shengliang Guan 已提交
252
  int32_t accessState;  // Configured only by command
S
Shengliang Guan 已提交
253 254 255 256 257 258 259
} SAcctCfg;

typedef struct {
  int32_t numOfUsers;
  int32_t numOfDbs;
  int32_t numOfTimeSeries;
  int32_t numOfStreams;
S
Shengliang Guan 已提交
260 261
  int64_t totalStorage;  // Total storage wrtten from this account
  int64_t compStorage;   // Compressed storage on disk
S
Shengliang Guan 已提交
262 263
} SAcctInfo;

S
Shengliang Guan 已提交
264
typedef struct {
S
Shengliang Guan 已提交
265 266 267 268
  char      acct[TSDB_USER_LEN];
  int64_t   createdTime;
  int64_t   updateTime;
  int32_t   acctId;
S
Shengliang Guan 已提交
269
  int32_t   status;
S
Shengliang Guan 已提交
270 271 272 273
  SAcctCfg  cfg;
  SAcctInfo info;
} SAcctObj;

S
Shengliang Guan 已提交
274
typedef struct {
S
Shengliang Guan 已提交
275
  char      user[TSDB_USER_LEN];
276
  char      pass[TSDB_PASSWORD_LEN];
S
Shengliang Guan 已提交
277 278 279
  char      acct[TSDB_USER_LEN];
  int64_t   createdTime;
  int64_t   updateTime;
280
  int8_t    superUser;
S
Shengliang Guan 已提交
281
  int32_t   acctId;
D
dapan 已提交
282
  int32_t   authVersion;
283 284
  SHashObj* readDbs;
  SHashObj* writeDbs;
S
Shengliang Guan 已提交
285
  SRWLatch  lock;
S
Shengliang Guan 已提交
286 287 288
} SUserObj;

typedef struct {
289
  int32_t numOfVgroups;
S
Shengliang Guan 已提交
290 291 292 293
  int32_t numOfStables;
  int32_t buffer;
  int32_t pageSize;
  int32_t pages;
S
Shengliang Guan 已提交
294 295 296 297
  int32_t daysPerFile;
  int32_t daysToKeep0;
  int32_t daysToKeep1;
  int32_t daysToKeep2;
S
Shengliang Guan 已提交
298 299
  int32_t minRows;
  int32_t maxRows;
S
Shengliang Guan 已提交
300
  int32_t fsyncPeriod;
S
Shengliang Guan 已提交
301
  int8_t  walLevel;
S
Shengliang Guan 已提交
302 303 304
  int8_t  precision;
  int8_t  compression;
  int8_t  replications;
S
Shengliang Guan 已提交
305
  int8_t  strict;
S
Shengliang Guan 已提交
306
  int8_t  cacheLastRow;
S
Shengliang Guan 已提交
307
  int8_t  hashMethod;  // default is 1
S
sma  
Shengliang Guan 已提交
308 309
  int32_t numOfRetensions;
  SArray* pRetensions;
S
Shengliang Guan 已提交
310 311
} SDbCfg;

S
Shengliang Guan 已提交
312
typedef struct {
313 314 315 316 317 318 319 320 321 322
  char     name[TSDB_DB_FNAME_LEN];
  char     acct[TSDB_USER_LEN];
  char     createUser[TSDB_USER_LEN];
  int64_t  createdTime;
  int64_t  updateTime;
  int64_t  uid;
  int32_t  cfgVersion;
  int32_t  vgVersion;
  SDbCfg   cfg;
  SRWLatch lock;
S
Shengliang Guan 已提交
323 324 325 326
} SDbObj;

typedef struct {
  int32_t    dnodeId;
S
Shengliang Guan 已提交
327
  ESyncState role;
S
Shengliang Guan 已提交
328 329
} SVnodeGid;

S
Shengliang Guan 已提交
330
typedef struct {
S
Shengliang Guan 已提交
331
  int32_t   vgId;
S
Shengliang Guan 已提交
332 333
  int64_t   createdTime;
  int64_t   updateTime;
S
Shengliang Guan 已提交
334
  int32_t   version;
S
Shengliang Guan 已提交
335 336
  uint32_t  hashBegin;
  uint32_t  hashEnd;
337
  char      dbName[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
338
  int64_t   dbUid;
S
Shengliang Guan 已提交
339 340
  int64_t   numOfTables;
  int64_t   numOfTimeSeries;
S
Shengliang Guan 已提交
341 342 343
  int64_t   totalStorage;
  int64_t   compStorage;
  int64_t   pointsWritten;
S
Shengliang Guan 已提交
344
  int8_t    compact;
S
Shengliang Guan 已提交
345
  int8_t    isTsma;
S
Shengliang Guan 已提交
346 347
  int8_t    replica;
  SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
S
Shengliang Guan 已提交
348 349
} SVgObj;

S
Shengliang Guan 已提交
350
typedef struct {
S
Shengliang Guan 已提交
351
  char    name[TSDB_TABLE_FNAME_LEN];
S
sma  
Shengliang Guan 已提交
352 353
  char    stb[TSDB_TABLE_FNAME_LEN];
  char    db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
354 355 356
  int64_t createdTime;
  int64_t uid;
  int64_t stbUid;
S
sma  
Shengliang Guan 已提交
357
  int64_t dbUid;
S
Shengliang Guan 已提交
358 359 360
  int8_t  intervalUnit;
  int8_t  slidingUnit;
  int8_t  timezone;
S
sma  
Shengliang Guan 已提交
361
  int32_t dstVgId;  // for stream
S
Shengliang Guan 已提交
362 363 364
  int64_t interval;
  int64_t offset;
  int64_t sliding;
S
sma  
Shengliang Guan 已提交
365
  int32_t exprLen;  // strlen + 1
S
Shengliang Guan 已提交
366
  int32_t tagsFilterLen;
S
sma  
Shengliang Guan 已提交
367 368
  int32_t sqlLen;
  int32_t astLen;
S
Shengliang Guan 已提交
369 370
  char*   expr;
  char*   tagsFilter;
S
sma  
Shengliang Guan 已提交
371 372
  char*   sql;
  char*   ast;
S
Shengliang Guan 已提交
373 374
} SSmaObj;

S
Shengliang Guan 已提交
375
typedef struct {
S
Shengliang Guan 已提交
376
  char     name[TSDB_TABLE_FNAME_LEN];
377
  char     db[TSDB_DB_FNAME_LEN];
378 379
  int64_t  createdTime;
  int64_t  updateTime;
L
Liu Jicong 已提交
380 381
  int64_t  uid;
  int64_t  dbUid;
382 383
  int32_t  tagVer;
  int32_t  colVer;
S
Shengliang Guan 已提交
384
  int32_t  nextColId;
S
sma  
Shengliang Guan 已提交
385 386
  float    xFilesFactor;
  int32_t  delay;
S
sma  
Shengliang Guan 已提交
387
  int32_t  ttl;
S
Shengliang Guan 已提交
388 389
  int32_t  numOfColumns;
  int32_t  numOfTags;
S
sma  
Shengliang Guan 已提交
390
  int32_t  commentLen;
391 392
  int32_t  ast1Len;
  int32_t  ast2Len;
S
Shengliang Guan 已提交
393
  SSchema* pColumns;
S
Shengliang Guan 已提交
394
  SSchema* pTags;
S
sma  
Shengliang Guan 已提交
395
  char*    comment;
396 397
  char*    pAst1;
  char*    pAst2;
S
Shengliang Guan 已提交
398
  SRWLatch lock;
S
Shengliang Guan 已提交
399
} SStbObj;
S
Shengliang Guan 已提交
400

S
Shengliang Guan 已提交
401
typedef struct {
S
Shengliang Guan 已提交
402 403
  char    name[TSDB_FUNC_NAME_LEN];
  int64_t createdTime;
S
Shengliang Guan 已提交
404 405 406 407 408 409
  int8_t  funcType;
  int8_t  scriptType;
  int8_t  align;
  int8_t  outputType;
  int32_t outputLen;
  int32_t bufSize;
S
Shengliang 已提交
410
  int64_t signature;
S
Shengliang Guan 已提交
411 412
  int32_t commentSize;
  int32_t codeSize;
L
Liu Jicong 已提交
413 414
  char*   pComment;
  char*   pCode;
S
Shengliang Guan 已提交
415 416
} SFuncObj;

S
Shengliang Guan 已提交
417
typedef struct {
L
Liu Jicong 已提交
418 419 420 421 422 423 424
  int64_t        id;
  int8_t         type;
  int8_t         replica;
  int16_t        numOfColumns;
  int32_t        numOfRows;
  void*          pIter;
  SMnode*        pMnode;
425
  STableMetaRsp* pMeta;
L
Liu Jicong 已提交
426 427
  bool           sysDbRsp;
  char           db[TSDB_DB_FNAME_LEN];
S
Shengliang Guan 已提交
428 429
} SShowObj;

430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
typedef struct {
  int64_t id;
  int8_t  type;
  int8_t  replica;
  int16_t numOfColumns;
  int32_t rowSize;
  int32_t numOfRows;
  int32_t numOfReads;
  int32_t payloadLen;
  void*   pIter;
  SMnode* pMnode;
  char    db[TSDB_DB_FNAME_LEN];
  int16_t offset[TSDB_MAX_COLUMNS];
  int32_t bytes[TSDB_MAX_COLUMNS];
  char    payload[];
} SSysTableRetrieveObj;

L
Liu Jicong 已提交
447 448
typedef struct {
  char    key[TSDB_PARTITION_KEY_LEN];
L
Liu Jicong 已提交
449
  int64_t dbUid;
L
Liu Jicong 已提交
450 451 452
  int64_t offset;
} SMqOffsetObj;

S
Shengliang Guan 已提交
453 454
int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
void*   tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset);
L
Liu Jicong 已提交
455

L
Liu Jicong 已提交
456
typedef struct {
L
Liu Jicong 已提交
457 458 459 460 461
  char           name[TSDB_TOPIC_FNAME_LEN];
  char           db[TSDB_DB_FNAME_LEN];
  int64_t        createTime;
  int64_t        updateTime;
  int64_t        uid;
L
Liu Jicong 已提交
462 463
  int64_t        dbUid;
  int32_t        version;
L
Liu Jicong 已提交
464 465 466 467
  int8_t         subType;  // db or table
  int8_t         withTbName;
  int8_t         withSchema;
  int8_t         withTag;
L
Liu Jicong 已提交
468
  SRWLatch       lock;
L
Liu Jicong 已提交
469
  int32_t        consumerCnt;
L
Liu Jicong 已提交
470
  int32_t        sqlLen;
L
Liu Jicong 已提交
471
  int32_t        astLen;
L
Liu Jicong 已提交
472
  char*          sql;
L
Liu Jicong 已提交
473
  char*          ast;
L
Liu Jicong 已提交
474 475
  char*          physicalPlan;
  SSchemaWrapper schema;
L
Liu Jicong 已提交
476
  // int32_t        refConsumerCnt;
L
Liu Jicong 已提交
477 478
} SMqTopicObj;

L
Liu Jicong 已提交
479
typedef struct {
S
Shengliang Guan 已提交
480 481
  int64_t  consumerId;
  char     cgroup[TSDB_CGROUP_LEN];
L
Liu Jicong 已提交
482
  char     clientId[256];
S
Shengliang Guan 已提交
483 484 485 486 487
  int8_t   updateType;  // used only for update
  int32_t  epoch;
  int32_t  status;
  int32_t  hbStatus;          // hbStatus is not applicable to serialization
  SRWLatch lock;              // lock is used for topics update
L
Liu Jicong 已提交
488 489 490
  SArray*  currentTopics;     // SArray<char*>
  SArray*  rebNewTopics;      // SArray<char*>
  SArray*  rebRemovedTopics;  // SArray<char*>
L
Liu Jicong 已提交
491

L
Liu Jicong 已提交
492 493 494
  // subscribed by user
  SArray* assignedTopics;  // SArray<char*>

L
Liu Jicong 已提交
495 496 497 498 499 500
  // data for display
  int32_t pid;
  SEpSet  ep;
  int64_t upTime;
  int64_t subscribeTime;
  int64_t rebalanceTime;
L
Liu Jicong 已提交
501 502
} SMqConsumerObj;

503 504 505 506
SMqConsumerObj* tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_LEN]);
void            tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t         tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void*           tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer);
L
Liu Jicong 已提交
507

508 509 510
typedef struct {
  int32_t vgId;
  char*   qmsg;
L
Liu Jicong 已提交
511
  SEpSet  epSet;
512 513 514 515 516 517 518 519 520 521
} SMqVgEp;

SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
void     tDeleteSMqVgEp(SMqVgEp* pVgEp);
int32_t  tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
void*    tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);

typedef struct {
  int64_t consumerId;  // -1 for unassigned
  SArray* vgs;         // SArray<SMqVgEp*>
522
} SMqConsumerEp;
523

524 525 526 527
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
void           tDeleteSMqConsumerEp(SMqConsumerEp* pEp);
int32_t        tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void*          tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
528 529 530 531

typedef struct {
  char      key[TSDB_SUBSCRIBE_KEY_LEN];
  SRWLatch  lock;
L
Liu Jicong 已提交
532
  int64_t   dbUid;
533
  int32_t   vgNum;
L
Liu Jicong 已提交
534 535 536 537
  int8_t    subType;
  int8_t    withTbName;
  int8_t    withSchema;
  int8_t    withTag;
538 539
  SHashObj* consumerHash;   // consumerId -> SMqConsumerEp
  SArray*   unassignedVgs;  // SArray<SMqVgEp*>
540 541 542 543 544 545 546 547 548 549
} SMqSubscribeObj;

SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
SMqSubscribeObj* tCloneSubscribeObj(const SMqSubscribeObj* pSub);
void             tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t          tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void*            tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub);

typedef struct {
  int32_t epoch;
550
  SArray* consumers;  // SArray<SMqConsumerEp*>
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568
} SMqSubActionLogEntry;

SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
void                  tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
int32_t               tEncodeSMqSubActionLogEntry(void** buf, const SMqSubActionLogEntry* pEntry);
void*                 tDecodeSMqSubActionLogEntry(const void* buf, SMqSubActionLogEntry* pEntry);

typedef struct {
  char    key[TSDB_SUBSCRIBE_KEY_LEN];
  SArray* logs;  // SArray<SMqSubActionLogEntry*>
} SMqSubActionLogObj;

SMqSubActionLogObj* tCloneSMqSubActionLogObj(SMqSubActionLogObj* pLog);
void                tDeleteSMqSubActionLogObj(SMqSubActionLogObj* pLog);
int32_t             tEncodeSMqSubActionLogObj(void** buf, const SMqSubActionLogObj* pLog);
void*               tDecodeSMqSubActionLogObj(const void* buf, SMqSubActionLogObj* pLog);

typedef struct {
L
Liu Jicong 已提交
569 570
  int32_t           oldConsumerNum;
  const SMqRebInfo* pRebInfo;
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
} SMqRebInputObj;

typedef struct {
  int64_t  oldConsumerId;
  int64_t  newConsumerId;
  SMqVgEp* pVgEp;
} SMqRebOutputVg;

typedef struct {
  SArray*               rebVgs;            // SArray<SMqRebOutputVg>
  SArray*               newConsumers;      // SArray<int64_t>
  SArray*               removedConsumers;  // SArray<int64_t>
  SArray*               touchedConsumers;  // SArray<int64_t>
  SMqSubscribeObj*      pSub;
  SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
L
Liu Jicong 已提交
587

L
Liu Jicong 已提交
588
typedef struct {
S
Shengliang Guan 已提交
589 590 591 592
  char           name[TSDB_TOPIC_FNAME_LEN];
  char           sourceDb[TSDB_DB_FNAME_LEN];
  char           targetDb[TSDB_DB_FNAME_LEN];
  char           targetSTbName[TSDB_TABLE_FNAME_LEN];
L
Liu Jicong 已提交
593
  int64_t        targetStbUid;
S
Shengliang Guan 已提交
594 595 596 597 598 599 600 601
  int64_t        createTime;
  int64_t        updateTime;
  int64_t        uid;
  int64_t        dbUid;
  int32_t        version;
  int32_t        vgNum;
  SRWLatch       lock;
  int8_t         status;
L
Liu Jicong 已提交
602 603
  int8_t         createdBy;      // STREAM_CREATED_BY__USER or SMA
  int32_t        fixedSinkVgId;  // 0 for shuffle
L
Liu Jicong 已提交
604 605
  SVgObj         fixedSinkVg;
  int64_t        smaId;  // 0 for unused
L
fix  
Liu Jicong 已提交
606 607 608
  int8_t         trigger;
  int32_t        triggerParam;
  int64_t        waterMark;
L
Liu Jicong 已提交
609 610 611 612
  char*          sql;
  char*          physicalPlan;
  SArray*        tasks;  // SArray<SArray<SStreamTask>>
  SSchemaWrapper outputSchema;
L
Liu Jicong 已提交
613 614
} SStreamObj;

H
Hongze Cheng 已提交
615 616
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj);
L
Liu Jicong 已提交
617

S
Shengliang Guan 已提交
618 619 620
#ifdef __cplusplus
}
#endif
S
Shengliang Guan 已提交
621

S
Shengliang Guan 已提交
622
#endif /*_TD_MND_DEF_H_*/