catalogInt.h 23.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#ifndef _TD_CATALOG_INT_H_
#define _TD_CATALOG_INT_H_

#ifdef __cplusplus
extern "C" {
#endif

H
Haojun Liao 已提交
23
#include "catalog.h"
S
common  
Shengliang Guan 已提交
24
#include "tcommon.h"
D
dapan1121 已提交
25
#include "query.h"
H
Haojun Liao 已提交
26

D
dapan1121 已提交
27 28 29
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
D
dapan1121 已提交
30
#define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000
D
dapan1121 已提交
31 32
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
D
dapan1121 已提交
33
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
D
dapan1121 已提交
34

D
dapan1121 已提交
35
#define CTG_RENT_SLOT_SECOND 1.5
D
dapan1121 已提交
36 37

#define CTG_DEFAULT_INVALID_VERSION (-1)
D
dapan 已提交
38

D
dapan1121 已提交
39
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST
D
dapan1121 已提交
40

D
dapan1121 已提交
41 42 43 44 45
enum {
  CTG_READ = 1,
  CTG_WRITE,
};

D
dapan1121 已提交
46 47 48 49 50
enum {
  CTG_RENT_DB = 1,
  CTG_RENT_STABLE,
};

D
dapan1121 已提交
51
enum {
D
dapan1121 已提交
52 53 54
  CTG_OP_UPDATE_VGROUP = 0,
  CTG_OP_UPDATE_TB_META,
  CTG_OP_DROP_DB_CACHE,
D
dapan1121 已提交
55
  CTG_OP_DROP_DB_VGROUP,
D
dapan1121 已提交
56 57 58 59
  CTG_OP_DROP_STB_META,
  CTG_OP_DROP_TB_META,
  CTG_OP_UPDATE_USER,
  CTG_OP_UPDATE_VG_EPSET,
D
dapan1121 已提交
60 61
  CTG_OP_UPDATE_TB_INDEX,
  CTG_OP_DROP_TB_INDEX,
D
dapan1121 已提交
62
  CTG_OP_CLEAR_CACHE,
D
dapan1121 已提交
63
  CTG_OP_MAX
D
dapan1121 已提交
64 65
};

D
dapan1121 已提交
66 67
typedef enum {
  CTG_TASK_GET_QNODE = 0,
D
dapan1121 已提交
68
  CTG_TASK_GET_DNODE,
D
dapan1121 已提交
69 70
  CTG_TASK_GET_DB_VGROUP,
  CTG_TASK_GET_DB_CFG,
D
dapan1121 已提交
71
  CTG_TASK_GET_DB_INFO,
D
dapan1121 已提交
72 73
  CTG_TASK_GET_TB_META,
  CTG_TASK_GET_TB_HASH,
D
dapan1121 已提交
74
  CTG_TASK_GET_TB_INDEX,
D
dapan1121 已提交
75
  CTG_TASK_GET_TB_CFG,
D
dapan1121 已提交
76 77 78
  CTG_TASK_GET_INDEX,
  CTG_TASK_GET_UDF,
  CTG_TASK_GET_USER,
D
dapan1121 已提交
79
  CTG_TASK_GET_SVR_VER,
D
dapan1121 已提交
80 81
} CTG_TASK_TYPE;

D
dapan1121 已提交
82 83 84 85 86 87
typedef enum {
  CTG_TASK_LAUNCHED = 1,
  CTG_TASK_DONE,
} CTG_TASK_STATUS;


D
dapan1121 已提交
88
typedef struct SCtgDebug {
D
dapan1121 已提交
89 90 91 92
  bool     lockEnable;
  bool     cacheEnable;
  bool     apiEnable;
  bool     metaEnable;
D
dapan1121 已提交
93
  uint32_t showCachePeriodSec;
D
dapan1121 已提交
94
} SCtgDebug;
95

D
dapan1121 已提交
96 97 98 99 100 101 102 103 104
typedef struct SCtgTbCacheInfo {
  bool     inCache;
  uint64_t dbId;
  uint64_t suid;
  int32_t  tbType;
} SCtgTbCacheInfo;

typedef struct SCtgTbMetaCtx {
  SCtgTbCacheInfo tbInfo;
D
dapan1121 已提交
105
  int32_t vgId;
D
dapan1121 已提交
106 107 108 109
  SName* pName;
  int32_t flag;
} SCtgTbMetaCtx;

D
dapan1121 已提交
110 111 112 113
typedef struct SCtgTbIndexCtx {
  SName* pName;
} SCtgTbIndexCtx;

D
dapan1121 已提交
114 115 116 117 118 119
typedef struct SCtgTbCfgCtx {
  SName*       pName;
  int32_t      tbType;
  SVgroupInfo* pVgInfo;
} SCtgTbCfgCtx;

D
dapan1121 已提交
120 121 122 123 124 125 126 127
typedef struct SCtgDbVgCtx {
  char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbVgCtx;

typedef struct SCtgDbCfgCtx {
  char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbCfgCtx;

D
dapan1121 已提交
128 129 130 131
typedef struct SCtgDbInfoCtx {
  char dbFName[TSDB_DB_FNAME_LEN];
} SCtgDbInfoCtx;

D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
typedef struct SCtgTbHashCtx {
  char dbFName[TSDB_DB_FNAME_LEN];
  SName* pName;
} SCtgTbHashCtx;

typedef struct SCtgIndexCtx {
  char indexFName[TSDB_INDEX_FNAME_LEN];
} SCtgIndexCtx;

typedef struct SCtgUdfCtx {
  char udfName[TSDB_FUNC_NAME_LEN];
} SCtgUdfCtx;

typedef struct SCtgUserCtx {
  SUserAuthInfo user;
} SCtgUserCtx;
148

D
dapan1121 已提交
149
typedef STableIndexRsp STableIndex;
D
dapan1121 已提交
150

D
dapan1121 已提交
151 152 153 154 155 156 157 158
typedef struct SCtgTbCache {
  SRWLatch     metaLock;
  STableMeta  *pMeta;
  SRWLatch     indexLock;
  STableIndex *pIndex;
} SCtgTbCache;

typedef struct SCtgVgCache {
D
dapan1121 已提交
159
  SRWLatch         vgLock;
D
dapan1121 已提交
160 161 162 163 164
  SDBVgInfo       *vgInfo;  
} SCtgVgCache;

typedef struct SCtgDBCache {
  SRWLatch         dbLock;          // RC between destroy tbCache/stbCache and all reads
D
dapan1121 已提交
165
  uint64_t         dbId;
D
dapan1121 已提交
166
  int8_t           deleted;
D
dapan1121 已提交
167 168
  SCtgVgCache      vgCache;
  SHashObj        *tbCache;         // key:tbname, value:SCtgTbCache
D
dapan1121 已提交
169
  SHashObj        *stbCache;        // key:suid, value:char*
D
dapan1121 已提交
170
} SCtgDBCache;
D
dapan1121 已提交
171

D
dapan1121 已提交
172
typedef struct SCtgRentSlot {
D
dapan1121 已提交
173 174
  SRWLatch lock;
  bool     needSort;
D
dapan1121 已提交
175
  SArray  *meta;  // element is SDbVgVersion or SSTableVersion
D
dapan1121 已提交
176
} SCtgRentSlot;
D
dapan1121 已提交
177

D
dapan1121 已提交
178
typedef struct SCtgRentMgmt {
D
dapan1121 已提交
179 180 181 182
  int8_t         type;
  uint16_t       slotNum;
  uint16_t       slotRIdx;
  int64_t        lastReadMsec;
D
dapan1121 已提交
183 184
  SCtgRentSlot  *slots;
} SCtgRentMgmt;
D
dapan1121 已提交
185

D
dapan 已提交
186 187 188 189 190 191 192 193 194
typedef struct SCtgUserAuth {
  int32_t   version;
  SRWLatch  lock;
  bool      superUser;
  SHashObj *createdDbs;
  SHashObj *readDbs;
  SHashObj *writeDbs;
} SCtgUserAuth;

D
dapan1121 已提交
195
typedef struct SCatalog {
D
dapan1121 已提交
196
  uint64_t         clusterId;  
D
dapan 已提交
197
  SHashObj        *userCache;    //key:user, value:SCtgUserAuth
D
dapan1121 已提交
198 199 200
  SHashObj        *dbCache;      //key:dbname, value:SCtgDBCache
  SCtgRentMgmt     dbRent;
  SCtgRentMgmt     stbRent;
H
Haojun Liao 已提交
201 202
} SCatalog;

D
dapan1121 已提交
203
typedef struct SCtgJob {
D
dapan1121 已提交
204 205 206 207
  int64_t           refId;
  SArray*           pTasks;
  int32_t           taskDone;
  SMetaData         jobRes;
D
dapan1121 已提交
208
  int32_t           jobResCode;
D
dapan1121 已提交
209 210 211
  int32_t           taskIdx;
  SRWLatch          taskLock;
  
D
dapan1121 已提交
212 213 214 215 216 217 218 219 220 221
  uint64_t          queryId;
  SCatalog*         pCtg; 
  SRequestConnInfo  conn;
  void*             userParam;
  catalogCallback   userFp;
  int32_t           tbMetaNum;
  int32_t           tbHashNum;
  int32_t           dbVgNum;
  int32_t           udfNum;
  int32_t           qnodeNum;
D
dapan1121 已提交
222
  int32_t           dnodeNum;
D
dapan1121 已提交
223 224 225 226 227
  int32_t           dbCfgNum;
  int32_t           indexNum;
  int32_t           userNum;
  int32_t           dbInfoNum;
  int32_t           tbIndexNum;
D
dapan1121 已提交
228
  int32_t           tbCfgNum;
D
dapan1121 已提交
229
  int32_t           svrVerNum;
D
dapan1121 已提交
230 231 232 233 234 235 236 237 238
} SCtgJob;

typedef struct SCtgMsgCtx {
  int32_t reqType;
  void* lastOut;
  void* out;
  char* target;
} SCtgMsgCtx;

D
dapan1121 已提交
239 240 241 242 243 244 245 246 247 248
typedef struct SCtgTask SCtgTask;
typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*);

typedef struct SCtgSubRes {
  CTG_TASK_TYPE  type;
  int32_t        code;
  void*          res;
  ctgSubTaskCbFp fp;
} SCtgSubRes;

D
dapan1121 已提交
249
typedef struct SCtgTask {
D
dapan1121 已提交
250 251 252 253 254 255 256 257 258 259 260
  CTG_TASK_TYPE   type;
  int32_t         taskId;
  SCtgJob*        pJob;
  void*           taskCtx;
  SCtgMsgCtx      msgCtx;
  int32_t         code;
  void*           res;
  CTG_TASK_STATUS status;
  SRWLatch        lock;
  SArray*         pParents;
  SCtgSubRes      subRes;
D
dapan1121 已提交
261 262
} SCtgTask;

D
dapan1121 已提交
263
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
D
dapan1121 已提交
264 265 266
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
D
dapan1121 已提交
267 268
typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**);
typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*);
D
dapan1121 已提交
269 270

typedef struct SCtgAsyncFps {
D
dapan1121 已提交
271 272
  ctgInitTaskFp         initFp;
  ctgLanchTaskFp        launchFp;
D
dapan1121 已提交
273
  ctgHandleTaskMsgRspFp handleRspFp;
D
dapan1121 已提交
274 275 276
  ctgDumpTaskResFp      dumpResFp;
  ctgCompTaskFp         compFp;
  ctgCloneTaskResFp     cloneFp;
D
dapan1121 已提交
277 278
} SCtgAsyncFps;

D
dapan1121 已提交
279 280
typedef struct SCtgApiStat {

wafwerar's avatar
wafwerar 已提交
281
#if defined(WINDOWS) || defined(_TD_DARWIN_64)
wafwerar's avatar
wafwerar 已提交
282 283 284
  size_t avoidCompilationErrors;
#endif

D
dapan1121 已提交
285 286
} SCtgApiStat;

D
dapan1121 已提交
287
typedef struct SCtgRuntimeStat {
D
dapan1121 已提交
288 289 290
  uint64_t numOfOpAbort;
  uint64_t numOfOpEnqueue;
  uint64_t numOfOpDequeue;
D
dapan1121 已提交
291
} SCtgRuntimeStat;
D
dapan1121 已提交
292 293

typedef struct SCtgCacheStat {
D
dapan1121 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307
  uint64_t numOfCluster;
  uint64_t numOfDb;
  uint64_t numOfTbl;
  uint64_t numOfStb;
  uint64_t numOfUser;
  uint64_t numOfVgHit;
  uint64_t numOfVgMiss;
  uint64_t numOfMetaHit;
  uint64_t numOfMetaMiss;
  uint64_t numOfIndexHit;
  uint64_t numOfIndexMiss;  
  uint64_t numOfUserHit;
  uint64_t numOfUserMiss;
  uint64_t numOfClear;
D
dapan1121 已提交
308 309 310 311
} SCtgCacheStat;

typedef struct SCatalogStat {
  SCtgApiStat      api;
D
dapan1121 已提交
312
  SCtgRuntimeStat  runtime;
D
dapan1121 已提交
313 314 315
  SCtgCacheStat    cache;
} SCatalogStat;

D
dapan1121 已提交
316 317 318 319
typedef struct SCtgUpdateMsgHeader {
  SCatalog* pCtg;
} SCtgUpdateMsgHeader;

D
dapan1121 已提交
320 321 322 323 324 325 326
typedef struct SCtgUpdateVgMsg {
  SCatalog* pCtg;
  char  dbFName[TSDB_DB_FNAME_LEN];
  uint64_t dbId;
  SDBVgInfo* dbInfo;
} SCtgUpdateVgMsg;

D
dapan1121 已提交
327 328 329 330
typedef struct SCtgUpdateTbMetaMsg {
  SCatalog*         pCtg;
  STableMetaOutput* pMeta;
} SCtgUpdateTbMetaMsg;
D
dapan1121 已提交
331

D
dapan1121 已提交
332
typedef struct SCtgDropDBMsg {
D
dapan1121 已提交
333 334 335
  SCatalog* pCtg;
  char  dbFName[TSDB_DB_FNAME_LEN];
  uint64_t dbId;
D
dapan1121 已提交
336
} SCtgDropDBMsg;
D
dapan1121 已提交
337

D
dapan1121 已提交
338 339 340 341 342 343 344
typedef struct SCtgDropDbVgroupMsg {
  SCatalog* pCtg;
  char  dbFName[TSDB_DB_FNAME_LEN];
} SCtgDropDbVgroupMsg;


typedef struct SCtgDropStbMetaMsg {
D
dapan1121 已提交
345 346 347 348 349
  SCatalog* pCtg;
  char  dbFName[TSDB_DB_FNAME_LEN];
  char  stbName[TSDB_TABLE_NAME_LEN];
  uint64_t dbId;
  uint64_t suid;
D
dapan1121 已提交
350
} SCtgDropStbMetaMsg;
D
dapan1121 已提交
351

D
dapan1121 已提交
352
typedef struct SCtgDropTblMetaMsg {
D
dapan1121 已提交
353 354 355 356
  SCatalog* pCtg;
  char  dbFName[TSDB_DB_FNAME_LEN];
  char  tbName[TSDB_TABLE_NAME_LEN];
  uint64_t dbId;
D
dapan1121 已提交
357
} SCtgDropTblMetaMsg;
D
dapan1121 已提交
358

D
dapan 已提交
359 360 361
typedef struct SCtgUpdateUserMsg {
  SCatalog* pCtg;
  SGetUserAuthRsp userAuth;
D
dapan 已提交
362
} SCtgUpdateUserMsg;
D
dapan 已提交
363

D
dapan1121 已提交
364 365 366 367 368 369 370 371 372 373 374
typedef struct SCtgUpdateTbIndexMsg {
  SCatalog*    pCtg;
  STableIndex* pIndex;
} SCtgUpdateTbIndexMsg;

typedef struct SCtgDropTbIndexMsg {
  SCatalog*    pCtg;
  char         dbFName[TSDB_DB_FNAME_LEN];
  char         tbName[TSDB_TABLE_NAME_LEN];
} SCtgDropTbIndexMsg;

D
dapan1121 已提交
375 376
typedef struct SCtgClearCacheMsg {
  SCatalog*    pCtg;
D
dapan1121 已提交
377
  bool         freeCtg;
D
dapan1121 已提交
378 379
} SCtgClearCacheMsg;

D
dapan1121 已提交
380 381 382 383 384 385
typedef struct SCtgUpdateEpsetMsg {
  SCatalog* pCtg;
  char  dbFName[TSDB_DB_FNAME_LEN];
  int32_t vgId;
  SEpSet  epSet;
} SCtgUpdateEpsetMsg;
D
dapan1121 已提交
386

D
dapan1121 已提交
387 388
typedef struct SCtgCacheOperation {
  int32_t  opId;
D
dapan1121 已提交
389
  void    *data;
D
dapan1121 已提交
390
  bool     syncOp;
D
dapan1121 已提交
391
  tsem_t   rspSem;  
D
dapan1121 已提交
392
  bool     stopQueue;
D
dapan1121 已提交
393
} SCtgCacheOperation;
D
dapan1121 已提交
394 395

typedef struct SCtgQNode {
D
dapan1121 已提交
396
  SCtgCacheOperation    *op;
D
dapan1121 已提交
397 398 399
  struct SCtgQNode      *next;
} SCtgQNode;

D
dapan1121 已提交
400
typedef struct SCtgQueue {
D
dapan1121 已提交
401
  SRWLatch              qlock;
D
dapan1121 已提交
402
  bool                  stopQueue;
D
dapan1121 已提交
403 404
  SCtgQNode            *head;
  SCtgQNode            *tail;
D
dapan1121 已提交
405
  tsem_t                reqSem;  
D
dapan 已提交
406
  uint64_t              qRemainNum;
D
dapan1121 已提交
407 408 409 410
} SCtgQueue;

typedef struct SCatalogMgmt {
  bool                  exit;
D
dapan1121 已提交
411
  int32_t               jobPool;
D
dapan1121 已提交
412 413
  SRWLatch              lock;
  SCtgQueue             queue;
D
dapan1121 已提交
414
  TdThread              updateThread;  
D
dapan1121 已提交
415 416 417
  SHashObj             *pCluster;     //key: clusterId, value: SCatalog*
  SCatalogStat          stat;
  SCatalogCfg           cfg;
D
dapan1121 已提交
418 419
} SCatalogMgmt;

D
dapan1121 已提交
420
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
D
dapan1121 已提交
421
typedef int32_t (*ctgOpFunc)(SCtgCacheOperation *);
D
dapan 已提交
422

D
dapan1121 已提交
423 424
typedef struct SCtgOperation {
  int32_t    opId;
D
dapan 已提交
425
  char       name[32];
D
dapan1121 已提交
426 427
  ctgOpFunc func;
} SCtgOperation;
D
dapan 已提交
428

D
dapan1121 已提交
429 430
#define CTG_QUEUE_INC() atomic_add_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
#define CTG_QUEUE_DEC() atomic_sub_fetch_64(&gCtgMgmt.queue.qRemainNum, 1)
D
dapan 已提交
431

D
dapan1121 已提交
432 433
#define CTG_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n)
#define CTG_STAT_DEC(_item, _n) atomic_sub_fetch_64(&(_item), _n)
D
dapan1121 已提交
434
#define CTG_STAT_GET(_item) atomic_load_64(&(_item))
D
dapan1121 已提交
435

D
dapan1121 已提交
436 437 438
#define CTG_RT_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.runtime.item, n))
#define CTG_CACHE_STAT_INC(item, n) (CTG_STAT_INC(gCtgMgmt.stat.cache.item, n))
#define CTG_CACHE_STAT_DEC(item, n) (CTG_STAT_DEC(gCtgMgmt.stat.cache.item, n))
D
dapan 已提交
439

D
dapan1121 已提交
440
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
D
dapan1121 已提交
441 442 443 444
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
#define CTG_IS_META_BOTH(type) ((type) == META_TYPE_BOTH_TABLE)

D
dapan1121 已提交
445 446 447
#define CTG_FLAG_STB          0x1
#define CTG_FLAG_NOT_STB      0x2
#define CTG_FLAG_UNKNOWN_STB  0x4
D
dapan1121 已提交
448
#define CTG_FLAG_SYS_DB       0x8
D
dapan1121 已提交
449 450
#define CTG_FLAG_FORCE_UPDATE 0x10

D
dapan 已提交
451 452
#define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v))

D
dapan1121 已提交
453 454 455
#define CTG_FLAG_IS_STB(_flag) ((_flag) & CTG_FLAG_STB)
#define CTG_FLAG_IS_NOT_STB(_flag) ((_flag) & CTG_FLAG_NOT_STB)
#define CTG_FLAG_IS_UNKNOWN_STB(_flag) ((_flag) & CTG_FLAG_UNKNOWN_STB)
D
dapan1121 已提交
456
#define CTG_FLAG_IS_SYS_DB(_flag) ((_flag) & CTG_FLAG_SYS_DB)
D
dapan1121 已提交
457
#define CTG_FLAG_IS_FORCE_UPDATE(_flag) ((_flag) & CTG_FLAG_FORCE_UPDATE)
D
dapan1121 已提交
458
#define CTG_FLAG_SET_SYS_DB(_flag) ((_flag) |= CTG_FLAG_SYS_DB)
D
dapan1121 已提交
459 460 461
#define CTG_FLAG_SET_STB(_flag, tbType) do { (_flag) |= ((tbType) == TSDB_SUPER_TABLE) ? CTG_FLAG_STB : ((tbType) > TSDB_SUPER_TABLE ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB); } while (0)
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
D
dapan1121 已提交
462

D
dapan1121 已提交
463
#define CTG_IS_SYS_DBNAME(_dbname) (((*(_dbname) == 'i') && (0 == strcmp(_dbname, TSDB_INFORMATION_SCHEMA_DB))) || ((*(_dbname) == 'p') && (0 == strcmp(_dbname, TSDB_PERFORMANCE_SCHEMA_DB))))
D
dapan1121 已提交
464

D
dapan1121 已提交
465 466
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))

D
dapan1121 已提交
467
#define CTG_TABLE_NOT_EXIST(code) (code == CTG_ERR_CODE_TABLE_NOT_EXIST) 
D
dapan1121 已提交
468
#define CTG_DB_NOT_EXIST(code) (code == TSDB_CODE_MND_DB_NOT_EXIST) 
D
dapan1121 已提交
469

D
dapan1121 已提交
470 471 472 473 474 475
#define ctgFatal(param, ...)  qFatal("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgError(param, ...)  qError("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgWarn(param, ...)   qWarn("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgInfo(param, ...)   qInfo("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgDebug(param, ...)  qDebug("CTG:%p " param, pCtg, __VA_ARGS__)
#define ctgTrace(param, ...)  qTrace("CTG:%p " param, pCtg, __VA_ARGS__)
D
dapan 已提交
476

D
dapan1121 已提交
477 478 479
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockEnable) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheEnable) { qDebug(__VA_ARGS__); } } while (0)
#define CTG_API_DEBUG(...) do { if (gCTGDebug.apiEnable) { qDebug(__VA_ARGS__); } } while (0)
480

D
dapan1121 已提交
481 482
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000

D
dapan1121 已提交
483 484
#define CTG_LOCK(type, _lock) do {   \
  if (CTG_READ == (type)) {          \
485 486
    assert(atomic_load_32((_lock)) >= 0);  \
    CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
487
    taosRLockLatch(_lock);           \
488 489
    CTG_LOCK_DEBUG("CTG RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    assert(atomic_load_32((_lock)) > 0);  \
D
dapan1121 已提交
490
  } else {                                                \
491 492
    assert(atomic_load_32((_lock)) >= 0);  \
    CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);  \
D
dapan1121 已提交
493
    taosWLockLatch(_lock);                                \
494 495
    CTG_LOCK_DEBUG("CTG WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);  \
    assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);  \
D
dapan1121 已提交
496 497 498 499 500
  }                                                       \
} while (0)

#define CTG_UNLOCK(type, _lock) do {                       \
  if (CTG_READ == (type)) {                                \
501 502
    assert(atomic_load_32((_lock)) > 0);  \
    CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
503
    taosRUnLockLatch(_lock);                              \
504 505
    CTG_LOCK_DEBUG("CTG RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    assert(atomic_load_32((_lock)) >= 0);  \
D
dapan1121 已提交
506
  } else {                                                \
507 508
    assert(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY);  \
    CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
D
dapan1121 已提交
509
    taosWUnLockLatch(_lock);                              \
510 511
    CTG_LOCK_DEBUG("CTG WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
    assert(atomic_load_32((_lock)) >= 0);  \
D
dapan1121 已提交
512 513 514
  }                                                       \
} while (0)

D
dapan1121 已提交
515 516 517 518 519
  
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)

520 521 522 523 524 525 526 527 528 529 530 531 532 533
#define CTG_API_LEAVE(c) do {                             \
  int32_t __code = c;                                     \
  CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);                   \
  CTG_API_DEBUG("CTG API leave %s", __FUNCTION__);        \
  CTG_RET(__code);                                        \
} while (0)

#define CTG_API_ENTER() do {                              \
  CTG_API_DEBUG("CTG API enter %s", __FUNCTION__);        \
  CTG_LOCK(CTG_READ, &gCtgMgmt.lock);                     \
  if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {           \
    CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE);          \
  }                                                       \
} while (0)
D
dapan1121 已提交
534

D
dapan1121 已提交
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549

#define CTG_API_LEAVE_NOLOCK(c) do {                        \
    int32_t __code = c;                                     \
    CTG_API_DEBUG("CTG API leave %s", __FUNCTION__);        \
    CTG_RET(__code);                                        \
  } while (0)

#define CTG_API_ENTER_NOLOCK() do {                       \
  CTG_API_DEBUG("CTG API enter %s", __FUNCTION__);        \
  if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {           \
    CTG_API_LEAVE_NOLOCK(TSDB_CODE_CTG_OUT_OF_SERVICE);   \
  }                                                       \
} while (0)


D
dapan1121 已提交
550 551
void    ctgdShowTableMeta(SCatalog* pCtg, const char *tbName, STableMeta* p);
void    ctgdShowClusterCache(SCatalog* pCtg);
D
dapan1121 已提交
552 553 554
int32_t ctgdShowCacheInfo(void);

int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq);
D
dapan1121 已提交
555
int32_t ctgGetTbMetaFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
D
dapan1121 已提交
556

D
dapan1121 已提交
557 558 559
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *action);
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropDbCache(SCtgCacheOperation *action);
D
dapan1121 已提交
560
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *action);
D
dapan1121 已提交
561 562 563 564
int32_t ctgOpDropStbMeta(SCtgCacheOperation *action);
int32_t ctgOpDropTbMeta(SCtgCacheOperation *action);
int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation);
D
dapan1121 已提交
565
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
D
dapan1121 已提交
566 567
void    ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void    ctgRUnlockVgInfo(SCtgDBCache *dbCache);
D
dapan1121 已提交
568 569
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
D
dapan1121 已提交
570 571
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
D
dapan1121 已提交
572
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
D
dapan1121 已提交
573
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
D
dapan1121 已提交
574 575 576 577 578 579
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
int32_t ctgDropTbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq);
int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq);
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
D
dapan1121 已提交
580
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
D
dapan1121 已提交
581
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool freeCtg, bool stopQueue, bool syncOp);
D
dapan1121 已提交
582 583 584 585 586 587
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
D
dapan1121 已提交
588
void    ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache);
D
dapan1121 已提交
589 590 591 592
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes);
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation);
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation);
D
dapan1121 已提交
593
int32_t ctgOpClearCache(SCtgCacheOperation *operation);
D
dapan1121 已提交
594
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType);
D
dapan1121 已提交
595
int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup);
D
dapan1121 已提交
596

D
dapan1121 已提交
597 598 599 600



int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
D
dapan1121 已提交
601 602
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
D
dapan1121 已提交
603
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask);
D
dapan1121 已提交
604 605
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
D
dapan1121 已提交
606
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
D
dapan1121 已提交
607 608 609 610 611
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
D
dapan1121 已提交
612 613
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
D
dapan1121 已提交
614
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
D
dapan1121 已提交
615

D
dapan1121 已提交
616
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param);
D
dapan1121 已提交
617 618
int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob);
D
dapan1121 已提交
619 620
int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param);
int32_t ctgGetTbCfgCb(SCtgTask *pTask);
D
dapan1121 已提交
621
void    ctgFreeHandle(SCatalog* pCatalog);
D
dapan1121 已提交
622 623 624 625

int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
D
dapan1121 已提交
626
void    ctgFreeJob(void* job);
D
dapan1121 已提交
627
void    ctgFreeHandleImpl(SCatalog* pCtg);
D
dapan1121 已提交
628
void    ctgFreeVgInfo(SDBVgInfo *vgInfo);
D
dapan1121 已提交
629
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
D
dapan1121 已提交
630 631
void    ctgResetTbMetaTask(SCtgTask* pTask);
void    ctgFreeDbCache(SCtgDBCache *dbCache);
D
dapan1121 已提交
632 633 634 635
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2);
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
D
dapan1121 已提交
636
void    ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
D
dapan1121 已提交
637
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
D
dapan1121 已提交
638
char *  ctgTaskTypeStr(CTG_TASK_TYPE type);
D
dapan1121 已提交
639
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
D
dapan1121 已提交
640
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
D
dapan1121 已提交
641
void    ctgFreeSTableIndex(void *info);
D
dapan1121 已提交
642
void    ctgClearSubTaskRes(SCtgSubRes *pRes);
D
dapan1121 已提交
643 644
void    ctgFreeQNode(SCtgQNode *node);
void    ctgClearHandle(SCatalog* pCtg);
D
dapan1121 已提交
645
void    ctgFreeTbCacheImpl(SCtgTbCache *pCache);
D
dapan1121 已提交
646 647 648 649 650


extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;
extern SCtgAsyncFps gCtgAsyncFps[];
D
dapan1121 已提交
651

H
Hongze Cheng 已提交
652 653 654 655
#ifdef __cplusplus
}
#endif

D
dapan 已提交
656
#endif /*_TD_CATALOG_INT_H_*/