ctgCache.c 77.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
X
Xiaoyu Wang 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
    {"Cluster   ", CTG_CI_FLAG_LEVEL_GLOBAL},   // CTG_CI_CLUSTER
    {"Dnode     ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_DNODE,
    {"Qnode     ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_QNODE,
    {"DB        ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_DB,
    {"DbVgroup  ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_DB_VGROUP,
    {"DbCfg     ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_DB_CFG,
    {"DbInfo    ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_DB_INFO,
    {"StbMeta   ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_STABLE_META,
    {"NtbMeta   ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_NTABLE_META,
    {"CtbMeta   ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_CTABLE_META,
    {"SysTblMeta", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_SYSTABLE_META,
    {"OthTblMeta", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_OTHERTABLE_META,
    {"TblSMA    ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_TBL_SMA,
    {"TblCfg    ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_TBL_CFG,
    {"IndexInfo ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_INDEX_INFO,
    {"User      ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_USER,
    {"UDF       ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_UDF,
    {"SvrVer    ", CTG_CI_FLAG_LEVEL_CLUSTER}   // CTG_CI_SVR_VER,
D
dapan1121 已提交
53 54
};

D
dapan1121 已提交
55 56
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
57

D
dapan1121 已提交
58
  if (dbCache->deleted) {
D
dapan1121 已提交
59
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
60

dengyihao's avatar
dengyihao 已提交
61 62
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
63 64 65 66
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
67 68
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
69 70

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
71
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
72 73 74 75
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
76

D
dapan1121 已提交
77 78 79
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
80 81
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
82 83

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
84
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
85
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
86 87 88 89 90 91
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
92
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
93

dengyihao's avatar
dengyihao 已提交
94
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
95

dengyihao's avatar
dengyihao 已提交
96 97
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
D
dapan1121 已提交
98 99
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
100

dengyihao's avatar
dengyihao 已提交
101
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
102
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
103
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
104 105 106
    dbFName = p + 1;
  }

D
dapan1121 已提交
107 108 109 110 111 112 113
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
dengyihao's avatar
dengyihao 已提交
114

D
dapan1121 已提交
115 116
  if (NULL == dbCache) {
    *pCache = NULL;
D
dapan1121 已提交
117
    CTG_CACHE_NHIT_INC(CTG_CI_DB, 1);
D
dapan1121 已提交
118 119 120 121
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
122 123 124 125
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
126 127 128
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
129 130
    }

D
dapan1121 已提交
131
    *pCache = NULL;
D
dapan1121 已提交
132
    CTG_CACHE_NHIT_INC(CTG_CI_DB, 1);
D
dapan1121 已提交
133 134 135 136 137
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
D
dapan1121 已提交
138
  CTG_CACHE_HIT_INC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
139

D
dapan1121 已提交
140 141 142
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
143
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
144 145 146
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
147
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
148 149 150
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
151
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
152 153 154
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
155

dengyihao's avatar
dengyihao 已提交
156
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
157
  if (pCache && dbCache) {
D
dapan1121 已提交
158
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
159
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
160
  }
D
dapan1121 已提交
161

D
dapan1121 已提交
162 163
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
164
  }
D
dapan1121 已提交
165
}
D
dapan1121 已提交
166

dengyihao's avatar
dengyihao 已提交
167
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
168 169
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
170
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
171 172 173 174 175 176 177
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

178
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
179
  if (pCache && dbCache) {
180 181 182 183
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
    taosHashRelease(dbCache->tbCache, pCache);
  }

D
dapan1121 已提交
184 185 186 187
  if (dbCache) {
    ctgRUnlockVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }
188 189
}

dengyihao's avatar
dengyihao 已提交
190
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
191
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
192
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
193
  if (NULL == dbCache) {
D
dapan1121 已提交
194 195 196 197 198
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
199
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
200 201 202 203 204 205 206
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
207
  CTG_CACHE_HIT_INC(CTG_CI_DB_VGROUP, 1);
D
dapan1121 已提交
208 209

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
210

D
dapan1121 已提交
211 212 213 214 215 216 217 218 219 220
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
221
  CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
dengyihao's avatar
dengyihao 已提交
222

D
dapan1121 已提交
223 224 225
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
226
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
227
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
228
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
229 230 231 232 233
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
234

D
dapan1121 已提交
235
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
236 237 238
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
239 240
  }

D
dapan1121 已提交
241 242 243 244 245 246 247 248 249 250
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
251

D
dapan1121 已提交
252
  CTG_META_HIT_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
253 254 255 256 257 258 259

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
260
  CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
261

D
dapan1121 已提交
262 263 264
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
265 266
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb,
                                  SCtgTbCache **pTb) {
267 268
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
X
Xiaoyu Wang 已提交
269
  bool         vgInCache = false;
270 271 272 273

  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
D
dapan1121 已提交
274
    CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
275 276 277 278 279 280
    goto _return;
  }

  ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
  if (!vgInCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
D
dapan1121 已提交
281
    CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
282 283 284 285 286
    goto _return;
  }

  *pDb = dbCache;

D
dapan1121 已提交
287
  CTG_CACHE_HIT_INC(CTG_CI_DB_VGROUP, 1);
288 289 290 291 292 293

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);

  tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
  if (NULL == tbCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
D
dapan1121 已提交
294
    CTG_META_NHIT_INC();
295 296 297 298 299 300
    goto _return;
  }

  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (NULL == tbCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
D
dapan1121 已提交
301
    CTG_META_NHIT_INC();
302 303 304 305 306 307 308
    goto _return;
  }

  *pTb = tbCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);

D
dapan1121 已提交
309
  CTG_META_HIT_INC(tbCache->pMeta->tableType);
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333

  return TSDB_CODE_SUCCESS;

_return:

  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }

  if (vgInCache) {
    ctgRUnlockVgInfo(dbCache);
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pDb = NULL;
  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

334
/*
dengyihao's avatar
dengyihao 已提交
335 336 337
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
338 339 340 341 342
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
343 344

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
345
  if (NULL == stName) {
D
dapan1121 已提交
346
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
347 348 349 350 351
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
352
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
353 354 355 356
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

D
dapan1121 已提交
357 358
  taosHashRelease(dbCache->stbCache, stName);

D
dapan1121 已提交
359 360
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
361
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
362 363 364 365 366 367
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
368
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
369

D
dapan1121 已提交
370
  CTG_META_HIT_INC(pCache->pMeta->tableType, 1);
D
dapan1121 已提交
371 372 373 374 375 376 377

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
378
  CTG_META_NHIT_INC(1);
D
dapan1121 已提交
379 380 381

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
382

D
dapan1121 已提交
383 384
  return TSDB_CODE_SUCCESS;
}
385
*/
D
dapan1121 已提交
386

X
Xiaoyu Wang 已提交
387 388
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid,
                                   SCtgTbCache **pTb) {
D
dapan1121 已提交
389
  SCtgTbCache *pCache = NULL;
X
Xiaoyu Wang 已提交
390
  char        *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
  if (NULL == stName) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  taosHashRelease(dbCache->stbCache, stName);

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  *pTb = pCache;

  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);

D
dapan1121 已提交
415
  CTG_META_HIT_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
416 417 418 419 420 421 422

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
423
  CTG_META_NHIT_INC();
D
dapan1121 已提交
424 425 426 427 428 429

  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
430
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
431
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
432
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
433 434
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
435 436 437
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
438

D
dapan1121 已提交
439
  int32_t sz = 0;
D
dapan1121 已提交
440
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
441 442 443 444 445 446 447 448 449
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
450 451
  }

D
dapan1121 已提交
452 453 454 455
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
456

D
dapan1121 已提交
457
  CTG_CACHE_HIT_INC(CTG_CI_TBL_SMA, 1);
D
dapan1121 已提交
458 459 460 461 462 463 464

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
465
  CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1);
dengyihao's avatar
dengyihao 已提交
466

D
dapan1121 已提交
467 468 469
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
470
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
471
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
472
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
473 474 475
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
476

D
dapan1121 已提交
477 478 479 480 481
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
482
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
483

D
dapan1121 已提交
484 485 486
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
487 488
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta,
                      char *dbFName) {
D
dapan1121 已提交
489
  SCtgDBCache *dbCache = *pDb;
490
  SCtgTbCache *tbCache = *pTb;
X
Xiaoyu Wang 已提交
491
  STableMeta  *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
492 493 494 495
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
496

D
dapan1121 已提交
497
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
498 499 500
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
501
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
502 503 504
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
505

D
dapan1121 已提交
506
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
507 508
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
509 510

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
511

D
dapan1121 已提交
512 513 514
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
515
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
516 517
  }

D
dapan1121 已提交
518
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
519

X
Xiaoyu Wang 已提交
520
  // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
521

522 523 524
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
  taosHashRelease(dbCache->tbCache, tbCache);
  *pTb = NULL;
X
Xiaoyu Wang 已提交
525

dengyihao's avatar
dengyihao 已提交
526 527
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
528

529
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
D
dapan1121 已提交
530 531
  if (NULL == tbCache) {
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
532
    *pDb = NULL;
D
dapan1121 已提交
533 534 535
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
536

537 538
  *pTb = tbCache;

dengyihao's avatar
dengyihao 已提交
539 540 541
  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
542 543
    taosMemoryFreeClear(*pTableMeta);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
544 545
  }

D
dapan1121 已提交
546
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
547
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
548
  if (NULL == *pTableMeta) {
549
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
550 551
  }

D
dapan1121 已提交
552
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
553

554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  return TSDB_CODE_SUCCESS;
}

int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
576
  CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
577

D
dapan1121 已提交
578
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
579

D
dapan1121 已提交
580
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
581

D
dapan1121 已提交
582 583 584 585
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
586
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
587
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
588
  *pTableMeta = NULL;
dengyihao's avatar
dengyihao 已提交
589

D
dapan1121 已提交
590 591 592
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
593 594
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
595
  *sver = -1;
D
dapan1121 已提交
596
  *tver = -1;
D
dapan1121 已提交
597 598

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
599
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
600
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
601 602
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
603 604 605
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
606 607 608
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
609
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
610 611
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
612

D
dapan1121 已提交
613
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
614 615 616
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
617 618
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
619

D
dapan1121 已提交
620
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
621 622 623
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
624
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
625

X
Xiaoyu Wang 已提交
626
  // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
627 628 629 630
  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }
X
Xiaoyu Wang 已提交
631

D
dapan1121 已提交
632
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
633

634
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
D
dapan1121 已提交
635
  if (NULL == tbCache) {
X
Xiaoyu Wang 已提交
636
    // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
637
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
638 639
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
640 641

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
642
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
643
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
644
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
645 646 647
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
648
  size_t nameLen = 0;
D
dapan1121 已提交
649
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
650 651 652 653

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
654 655
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
656

D
dapan1121 已提交
657
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
658

dengyihao's avatar
dengyihao 已提交
659 660
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
661 662 663 664

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
665
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
666
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
667
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
668
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
669 670
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
671 672
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
673 674 675 676

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
677 678
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
679 680 681
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
682 683
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
684
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
685
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
686 687
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
688

D
dapan1121 已提交
689
  *pRes = NULL;
D
dapan1121 已提交
690

D
dapan1121 已提交
691 692 693
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
694 695 696
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
697
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
698

D
dapan1121 已提交
699
_return:
D
dapan1121 已提交
700

D
dapan1121 已提交
701
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
702

D
dapan1121 已提交
703
  CTG_RET(code);
D
dapan1121 已提交
704 705
}

706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName) {
  *stbName = NULL;

  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    return TSDB_CODE_SUCCESS;
  }  
  
  char *stb = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
  if (NULL == stb) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *stbName = taosStrdup(stb);

  taosHashRelease(dbCache->stbCache, stb);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
729
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
D
dapan1121 已提交
730
  if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
731
    *inCache = true;
D
dapan1121 已提交
732 733
    pRes->pRawRes->pass = true;
    ctgDebug("sysdb %s, pass", pReq->tbName.dbname);
734 735 736
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
737
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, pReq->user, strlen(pReq->user));
D
dapan1121 已提交
738
  if (NULL == pUser) {
D
dapan1121 已提交
739
    ctgDebug("user not in cache, user:%s", pReq->user);
D
dapan1121 已提交
740 741 742 743 744
    goto _return;
  }

  *inCache = true;

D
dapan1121 已提交
745
  ctgDebug("Got user from cache, user:%s", pReq->user);
D
dapan1121 已提交
746
  CTG_CACHE_HIT_INC(CTG_CI_USER, 1);
dengyihao's avatar
dengyihao 已提交
747

D
dapan1121 已提交
748 749 750
  SCtgAuthReq req = {0};
  req.pRawReq = pReq;
  req.onlyCache = true;
D
dapan1121 已提交
751 752

  CTG_LOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
753 754
  memcpy(&req.authInfo, &pUser->userAuth, sizeof(pUser->userAuth));
  int32_t code = ctgChkSetAuthRes(pCtg, &req, pRes);
D
dapan1121 已提交
755
  CTG_UNLOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
756
  CTG_ERR_JRET(code);
dengyihao's avatar
dengyihao 已提交
757

D
dapan1121 已提交
758 759
  if (pRes->metaNotExists) {
    goto _return;
D
dapan1121 已提交
760 761
  }

D
dapan1121 已提交
762
  CTG_RET(code);
D
dapan1121 已提交
763 764 765 766

_return:

  *inCache = false;
D
dapan1121 已提交
767
  CTG_CACHE_NHIT_INC(CTG_CI_USER, 1);
dengyihao's avatar
dengyihao 已提交
768

D
dapan1121 已提交
769 770 771
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
772
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
773
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
774

D
dapan1121 已提交
775 776 777
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
778
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
779

D
dapan1121 已提交
780 781
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
782
  *op = node->op;
D
dapan1121 已提交
783 784
}

dengyihao's avatar
dengyihao 已提交
785
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
786 787 788
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
789 790
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
791
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
792 793
  }

dengyihao's avatar
dengyihao 已提交
794 795
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
796 797 798
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
799

D
dapan1121 已提交
800
  node->op = operation;
D
dapan1121 已提交
801 802

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
803

D
dapan1121 已提交
804
  if (gCtgMgmt.queue.stopQueue) {
805 806 807 808
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
809

D
dapan1121 已提交
810 811
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
812 813 814

  gCtgMgmt.queue.stopQueue = operation->stopQueue;

D
dapan1121 已提交
815 816
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
817 818
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
819
  CTG_QUEUE_INC();
D
dapan1121 已提交
820
  CTG_STAT_RT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
821 822 823

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
824
  if (syncOp) {
825 826 827
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
828
    tsem_wait(&operation->rspSem);
829 830 831
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
832
    taosMemoryFree(operation);
D
dapan1121 已提交
833 834 835 836 837
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
838 839
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
840 841
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
842
  op->syncOp = true;
dengyihao's avatar
dengyihao 已提交
843

D
dapan1121 已提交
844
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
845
  if (NULL == msg) {
D
dapan1121 已提交
846
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
847
    taosMemoryFree(op);
D
dapan1121 已提交
848
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
849 850 851
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
852
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
853 854 855 856
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
857
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
858 859
  msg->dbId = dbId;

D
dapan1121 已提交
860
  op->data = msg;
D
dapan1121 已提交
861

D
dapan1121 已提交
862
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
863 864 865 866 867 868 869 870

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
871 872
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
873 874 875
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
876

D
dapan1121 已提交
877 878 879
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
880
    taosMemoryFree(op);
D
dapan1121 已提交
881
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
882 883 884
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
885
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
886 887 888 889
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
890
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
891

D
dapan1121 已提交
892
  op->data = msg;
D
dapan1121 已提交
893

D
dapan1121 已提交
894
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
895 896 897 898 899 900 901 902

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
903 904 905
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
906 907 908
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
909

D
dapan1121 已提交
910
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
911
  if (NULL == msg) {
D
dapan1121 已提交
912
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
913
    taosMemoryFree(op);
D
dapan1121 已提交
914
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
915 916 917
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
918 919
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
920 921 922
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
923
  op->data = msg;
D
dapan1121 已提交
924

D
dapan1121 已提交
925
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
926 927 928 929 930 931 932 933

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
934 935
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
936 937 938
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
939

D
dapan1121 已提交
940
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
941
  if (NULL == msg) {
D
dapan1121 已提交
942
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
943
    taosMemoryFree(op);
D
dapan1121 已提交
944
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
945 946 947
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
948 949
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
950 951
  msg->dbId = dbId;

D
dapan1121 已提交
952
  op->data = msg;
D
dapan1121 已提交
953

D
dapan1121 已提交
954
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
955 956 957 958 959 960 961 962

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
963 964
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
965 966 967
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
968

D
dapan1121 已提交
969 970 971
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
972
    taosMemoryFree(op);
973
    freeVgInfo(dbInfo);
D
dapan1121 已提交
974
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
975 976 977
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
978
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
979 980 981
    dbFName = p + 1;
  }

982 983 984 985 986 987
  code = ctgMakeVgArray(dbInfo);
  if (code) {
    taosMemoryFree(op);
    taosMemoryFree(msg);
    freeVgInfo(dbInfo);
    CTG_ERR_RET(code);
988 989
  }

D
dapan1121 已提交
990
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
991 992 993 994
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
995
  op->data = msg;
D
dapan1121 已提交
996

D
dapan1121 已提交
997
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
998 999 1000 1001 1002

  return TSDB_CODE_SUCCESS;

_return:

1003
  freeVgInfo(dbInfo);
D
dapan1121 已提交
1004 1005 1006
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1007 1008
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1009 1010 1011
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1012

D
dapan1121 已提交
1013
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
1014
  if (NULL == msg) {
D
dapan1121 已提交
1015
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
1016
    taosMemoryFree(op);
D
dapan1121 已提交
1017
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1018 1019 1020
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
1021
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
1022 1023
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
1024 1025 1026
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1027
  msg->pMeta = output;
D
dapan1121 已提交
1028

D
dapan1121 已提交
1029
  op->data = msg;
D
dapan1121 已提交
1030

D
dapan1121 已提交
1031
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1032 1033

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1034

D
dapan1121 已提交
1035 1036
_return:

D
dapan1121 已提交
1037 1038 1039 1040 1041
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
1042 1043 1044
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1045 1046
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
1047 1048
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
1049

D
dapan1121 已提交
1050 1051 1052
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
1053
    taosMemoryFree(op);
D
dapan1121 已提交
1054
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1055 1056 1057
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1058
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
1059 1060 1061
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
1062
  op->data = msg;
D
dapan1121 已提交
1063

D
dapan1121 已提交
1064
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1065 1066

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1067

D
dapan1121 已提交
1068 1069 1070 1071 1072
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1073 1074
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1075 1076 1077
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1078

D
dapan1121 已提交
1079 1080 1081
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
1082
    taosMemoryFree(op);
D
dapan1121 已提交
1083
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1084 1085 1086 1087 1088
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
1089
  op->data = msg;
D
dapan1121 已提交
1090

D
dapan1121 已提交
1091
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1092

D
dapan1121 已提交
1093
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1094

D
dapan1121 已提交
1095 1096 1097
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
1098

D
dapan1121 已提交
1099 1100 1101
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1102 1103
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1104 1105 1106
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1107

D
dapan1121 已提交
1108 1109 1110
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
1111
    taosMemoryFree(op);
D
dapan1121 已提交
1112
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1113 1114 1115
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1116
  msg->pIndex = *pIndex;
D
dapan1121 已提交
1117 1118 1119 1120

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1121 1122

  *pIndex = NULL;
D
dapan1121 已提交
1123
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1124

D
dapan1121 已提交
1125 1126
_return:

D
dapan1121 已提交
1127
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
1128
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
1129

D
dapan1121 已提交
1130 1131 1132
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1133 1134
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1135 1136 1137
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1138

D
dapan1121 已提交
1139 1140 1141
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
1142
    taosMemoryFree(op);
D
dapan1121 已提交
1143
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1144 1145 1146 1147 1148 1149 1150 1151 1152
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1153

D
dapan1121 已提交
1154
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1155

D
dapan1121 已提交
1156 1157 1158 1159 1160
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1161 1162
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1163 1164 1165
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
1166
  op->stopQueue = stopQueue;
1167
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
1168

D
dapan1121 已提交
1169 1170 1171
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
1172
    taosMemoryFree(op);
D
dapan1121 已提交
1173
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1174 1175 1176
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1177
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
1178 1179 1180
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1181

D
dapan1121 已提交
1182
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1183

D
dapan1121 已提交
1184 1185 1186 1187 1188
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
1189 1190 1191 1192 1193 1194
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1195

D
dapan1121 已提交
1196 1197 1198
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1199
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1200 1201 1202
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1203

D
dapan1121 已提交
1204 1205 1206 1207 1208 1209 1210
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1211 1212
  int32_t       code = 0;

D
dapan1121 已提交
1213 1214 1215 1216
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1217 1218
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1219
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1220 1221 1222 1223
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1224
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1225
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1226 1227 1228 1229
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1230
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1231 1232 1233 1234 1235 1236 1237

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1238 1239
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1240 1241 1242
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1243
  int32_t       code = 0;
D
dapan1121 已提交
1244 1245 1246

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1247
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1248 1249 1250 1251
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1252 1253
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1254 1255 1256 1257 1258 1259 1260
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1261 1262
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1263 1264 1265 1266 1267
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1268
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1275 1276
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1277 1278 1279 1280 1281 1282 1283 1284 1285 1286
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1287 1288
  int32_t       code = 0;

D
dapan1121 已提交
1289 1290
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1291
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1303
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1304 1305 1306 1307 1308
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1309
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1326 1327
  int32_t       code = 0;

D
dapan1121 已提交
1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1346
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1393 1394
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1395
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1396
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1397
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1398 1399
  }

dengyihao's avatar
dengyihao 已提交
1400 1401
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1402
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1403
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1404
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1405 1406 1407 1408 1409 1410 1411 1412
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1413

D
dapan1121 已提交
1414
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1415
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1416 1417
  }

D
dapan1121 已提交
1418
  CTG_CACHE_NUM_INC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
1419

D
dapan1121 已提交
1420
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
D
dapan1121 已提交
1421
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1422

dengyihao's avatar
dengyihao 已提交
1423
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1424

D
dapan1121 已提交
1425 1426
  if (!IS_SYS_DBNAME(dbFName)) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1427

D
dapan1121 已提交
1428 1429
    ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
  }
D
dapan1121 已提交
1430 1431 1432 1433 1434 1435 1436 1437 1438 1439

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1440
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1441 1442 1443
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1444

D
dapan1121 已提交
1445 1446 1447 1448
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1449

dengyihao's avatar
dengyihao 已提交
1450 1451 1452
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1453
    }
dengyihao's avatar
dengyihao 已提交
1454

D
dapan1121 已提交
1455
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1456 1457 1458
  }
}

dengyihao's avatar
dengyihao 已提交
1459
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1460
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1461 1462

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1463

D
dapan1121 已提交
1464
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1465

D
dapan1121 已提交
1466
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1467
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1468 1469
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1470 1471 1472
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1473
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1474 1475 1476 1477 1478 1479

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1480
  CTG_CACHE_NUM_DEC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
1481 1482
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1483 1484 1485
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1486 1487
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1488 1489
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1490

D
dapan1121 已提交
1491
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1492
    // TODO OPEN IT
D
dapan1121 已提交
1493 1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1509

D
dapan1121 已提交
1510 1511 1512 1513 1514 1515 1516
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1517

D
dapan1121 已提交
1518 1519 1520 1521 1522 1523 1524 1525 1526
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1527 1528
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1529 1530 1531 1532 1533 1534 1535 1536 1537
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1538

D
dapan1121 已提交
1539 1540
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1541

dengyihao's avatar
dengyihao 已提交
1542 1543
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1544

dengyihao's avatar
dengyihao 已提交
1545 1546
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1547

dengyihao's avatar
dengyihao 已提交
1548 1549
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1550

dengyihao's avatar
dengyihao 已提交
1551 1552
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1553 1554
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1555
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1556 1557 1558
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1559 1560 1561 1562 1563
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1564 1565 1566
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1567 1568
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1569 1570
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1571 1572
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1573

D
dapan1121 已提交
1574
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1575
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1576
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1577
      } else {
dengyihao's avatar
dengyihao 已提交
1578
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1579 1580 1581 1582
      }
    }
  }

D
dapan1121 已提交
1583 1584 1585 1586 1587
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
1588
      taosMemoryFree(meta);
D
dapan1121 已提交
1589
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1590
    }
dengyihao's avatar
dengyihao 已提交
1591

D
dapan1121 已提交
1592 1593
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1594
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1595 1596 1597
    if (orig) {
      CTG_META_NUM_DEC(origType);
    }
D
dapan1121 已提交
1598 1599
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1600
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1601 1602
  }

D
dapan1121 已提交
1603
  CTG_META_NUM_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
1604 1605 1606 1607 1608 1609 1610 1611

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1612
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1613
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1614
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1615 1616
  }

dengyihao's avatar
dengyihao 已提交
1617 1618
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1619

D
dapan1121 已提交
1620 1621 1622
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1623

D
dapan1121 已提交
1624 1625 1626
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1627
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1628
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1629
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1630
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1631
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1632 1633 1634
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1635 1636 1637
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1638 1639 1640
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1641

D
dapan1121 已提交
1642 1643
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1644 1645
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1646
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1647 1648
    }

D
dapan1121 已提交
1649
    CTG_DB_NUM_INC(CTG_CI_TBL_SMA);
X
Xiaoyu Wang 已提交
1650

D
dapan1121 已提交
1651
    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1652 1653
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1654

D
dapan1121 已提交
1655
    if (suid) {
D
dapan1121 已提交
1656
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1657
    }
dengyihao's avatar
dengyihao 已提交
1658

D
dapan1121 已提交
1659 1660 1661
    return TSDB_CODE_SUCCESS;
  }

1662 1663
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1664
  if (pCache->pIndex) {
D
dapan1121 已提交
1665 1666 1667
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1668 1669 1670 1671 1672
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1673 1674
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1675 1676
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1677 1678
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1679

D
dapan1121 已提交
1680 1681 1682
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1683

D
dapan1121 已提交
1684 1685 1686
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1687 1688 1689 1690
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1691
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1692 1693 1694
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1695 1696

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1697

D
dapan1121 已提交
1698 1699 1700 1701 1702 1703
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1704
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1705
  SCatalog *pCtg = NULL;
1706

dengyihao's avatar
dengyihao 已提交
1707
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1708
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1709
    pCtg = *(SCatalog **)pIter;
1710 1711

    if (pCtg) {
D
dapan1121 已提交
1712 1713 1714 1715 1716 1717 1718 1719
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1720
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1721

dengyihao's avatar
dengyihao 已提交
1722
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1723
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1724
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1725 1726 1727

    if (pCtg) {
      ctgFreeHandle(pCtg);
1728 1729 1730 1731 1732 1733 1734 1735
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

X
Xiaoyu Wang 已提交
1736 1737 1738
int32_t ctgVgInfoIdComp(void const *lp, void const *rp) {
  int32_t     *key = (int32_t *)lp;
  SVgroupInfo *pVg = (SVgroupInfo *)rp;
1739 1740 1741 1742 1743 1744 1745 1746 1747 1748

  if (*key < pVg->vgId) {
    return -1;
  } else if (*key > pVg->vgId) {
    return 1;
  }

  return 0;
}

D
dapan1121 已提交
1749
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1750
  int32_t          code = 0;
D
dapan1121 已提交
1751
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1752 1753 1754 1755
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1756
  if (pCtg->stopUpdate || NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1757
    goto _return;
D
dapan1121 已提交
1758
  }
dengyihao's avatar
dengyihao 已提交
1759

D
dapan1121 已提交
1760
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1761
    ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dengyihao's avatar
dengyihao 已提交
1762
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1763
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1764 1765
  }

dengyihao's avatar
dengyihao 已提交
1766
  bool         newAdded = false;
dengyihao's avatar
dengyihao 已提交
1767 1768
  SDbVgVersion vgVersion = {
      .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
D
dapan1121 已提交
1769 1770

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1771
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1772
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1773
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1774
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1775 1776 1777
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1778
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1779

D
dapan1121 已提交
1780 1781
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1782

D
dapan1121 已提交
1783
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
dengyihao's avatar
dengyihao 已提交
1784 1785
      ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion,
               vgInfo->vgVersion);
D
dapan1121 已提交
1786
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1787

D
dapan1121 已提交
1788
      goto _return;
D
dapan1121 已提交
1789 1790
    }

dengyihao's avatar
dengyihao 已提交
1791 1792 1793 1794
    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable &&
        dbInfo->stateTs == vgInfo->stateTs) {
      ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName,
               dbInfo->vgVersion, dbInfo->numOfTable, dbInfo->stateTs);
D
dapan1121 已提交
1795
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1796

D
dapan1121 已提交
1797
      goto _return;
D
dapan1121 已提交
1798 1799
    }

1800
    freeVgInfo(vgInfo);
D
dapan1121 已提交
1801
    CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1802 1803 1804 1805
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;
D
dapan1121 已提交
1806
  CTG_DB_NUM_SET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1807

dengyihao's avatar
dengyihao 已提交
1808 1809
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
           vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
D
dapan1121 已提交
1810 1811 1812 1813 1814

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

X
Xiaoyu Wang 已提交
1815 1816 1817 1818
  // if (!IS_SYS_DBNAME(dbFName)) {
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                 ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
1819
  //}
D
dapan1121 已提交
1820 1821 1822

_return:

1823
  freeVgInfo(msg->dbInfo);
D
dapan1121 已提交
1824
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1825

D
dapan1121 已提交
1826 1827 1828
  CTG_RET(code);
}

D
dapan1121 已提交
1829
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1830
  int32_t        code = 0;
D
dapan1121 已提交
1831
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1832
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1833

D
dapan1121 已提交
1834 1835 1836 1837
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1838 1839 1840 1841 1842
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1843

1844
  if (msg->dbId && dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1845 1846
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1847 1848
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1849

D
dapan1121 已提交
1850 1851 1852 1853 1854
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1855

D
dapan1121 已提交
1856 1857 1858
  CTG_RET(code);
}

D
dapan1121 已提交
1859
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1860
  int32_t              code = 0;
D
dapan1121 已提交
1861
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1862
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1863

D
dapan1121 已提交
1864 1865 1866 1867
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1868 1869 1870 1871 1872
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1873

dengyihao's avatar
dengyihao 已提交
1874
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1875

1876
  freeVgInfo(dbCache->vgCache.vgInfo);
D
dapan1121 已提交
1877
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1878

D
dapan1121 已提交
1879
  CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1880 1881
  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1882
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1883 1884 1885 1886

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1887

D
dapan1121 已提交
1888 1889 1890
  CTG_RET(code);
}

D
dapan1121 已提交
1891
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1892
  int32_t              code = 0;
D
dapan1121 已提交
1893
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1894 1895 1896
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1897

D
dapan1121 已提交
1898 1899 1900
  if (pCtg->stopUpdate) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1901

D
dapan1121 已提交
1902 1903
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1904 1905 1906
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1907 1908
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1909
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1910 1911
  }

D
dapan1121 已提交
1912
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1913
  if (NULL == dbCache) {
D
dapan1121 已提交
1914
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1915 1916 1917
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1918 1919
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1920
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1921
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1922
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1923 1924
  }

D
dapan1121 已提交
1925
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1926
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1927 1928 1929 1930
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1931 1932
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1933 1934 1935 1936
  }

_return:

D
dapan1121 已提交
1937 1938
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1939

D
dapan1121 已提交
1940
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1941

D
dapan1121 已提交
1942 1943 1944
  CTG_RET(code);
}

D
dapan1121 已提交
1945
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1946
  int32_t             code = 0;
D
dapan1121 已提交
1947
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1948
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1949
  int32_t             tblType = 0;
D
dapan1121 已提交
1950

D
dapan1121 已提交
1951 1952 1953 1954
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1955 1956 1957
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1958
    goto _return;
D
dapan1121 已提交
1959 1960 1961
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1962
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1963
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1964
    goto _return;
D
dapan1121 已提交
1965
  }
dengyihao's avatar
dengyihao 已提交
1966

D
dapan1121 已提交
1967
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1968 1969
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1970 1971
  }

dengyihao's avatar
dengyihao 已提交
1972
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1973 1974 1975 1976 1977 1978
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
D
dapan1121 已提交
1979
  tblType = pTbCache->pMeta->tableType;
D
dapan1121 已提交
1980 1981 1982
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1983 1984
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1985
  } else {
D
dapan1121 已提交
1986
    CTG_META_NUM_DEC(tblType);
D
dapan1121 已提交
1987
  }
dengyihao's avatar
dengyihao 已提交
1988 1989

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1990 1991

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1992 1993 1994

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1995 1996 1997
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1998

D
dapan1121 已提交
1999 2000 2001
  CTG_RET(code);
}

D
dapan1121 已提交
2002
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2003
  int32_t             code = 0;
D
dapan1121 已提交
2004
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2005
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
2006
  int32_t             tblType = 0;
D
dapan1121 已提交
2007

D
dapan1121 已提交
2008 2009 2010 2011
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2012 2013 2014
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
2015
    goto _return;
D
dapan1121 已提交
2016 2017 2018
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
2019 2020
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
2021
    goto _return;
D
dapan1121 已提交
2022 2023
  }

dengyihao's avatar
dengyihao 已提交
2024
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
2025 2026 2027 2028 2029 2030
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
D
dapan1121 已提交
2031
  tblType = pTbCache->pMeta->tableType;
D
dapan1121 已提交
2032 2033
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2034

D
dapan1121 已提交
2035 2036 2037
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
2038
  } else {
D
dapan1121 已提交
2039
    CTG_META_NUM_DEC(tblType);
D
dapan1121 已提交
2040 2041
  }

D
dapan1121 已提交
2042
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
2043 2044 2045 2046 2047 2048 2049 2050

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
2051
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2052
  int32_t            code = 0;
D
dapan1121 已提交
2053
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2054 2055
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
2056 2057 2058 2059
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2060 2061 2062 2063
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

D
dapan1121 已提交
2064
    memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
D
dapan1121 已提交
2065 2066 2067 2068 2069 2070 2071 2072

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

D
dapan1121 已提交
2073 2074
    CTG_CACHE_NUM_INC(CTG_CI_USER, 1);

D
dapan1121 已提交
2075 2076 2077 2078 2079
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_WRITE, &pUser->lock);

D
dapan1121 已提交
2080 2081
  taosHashCleanup(pUser->userAuth.createdDbs);
  pUser->userAuth.createdDbs = msg->userAuth.createdDbs;
D
dapan1121 已提交
2082 2083
  msg->userAuth.createdDbs = NULL;

D
dapan1121 已提交
2084 2085
  taosHashCleanup(pUser->userAuth.readDbs);
  pUser->userAuth.readDbs = msg->userAuth.readDbs;
D
dapan1121 已提交
2086 2087
  msg->userAuth.readDbs = NULL;

D
dapan1121 已提交
2088 2089
  taosHashCleanup(pUser->userAuth.writeDbs);
  pUser->userAuth.writeDbs = msg->userAuth.writeDbs;
D
dapan1121 已提交
2090 2091
  msg->userAuth.writeDbs = NULL;

X
Xiaoyu Wang 已提交
2092 2093 2094 2095 2096 2097 2098 2099
  taosHashCleanup(pUser->userAuth.readTbs);
  pUser->userAuth.readTbs = msg->userAuth.readTbs;
  msg->userAuth.readTbs = NULL;

  taosHashCleanup(pUser->userAuth.writeTbs);
  pUser->userAuth.writeTbs = msg->userAuth.writeTbs;
  msg->userAuth.writeTbs = NULL;

X
Xiaoyu Wang 已提交
2100 2101 2102 2103
  taosHashCleanup(pUser->userAuth.useDbs);
  pUser->userAuth.useDbs = msg->userAuth.useDbs;
  msg->userAuth.useDbs = NULL;

D
dapan1121 已提交
2104 2105 2106 2107 2108 2109 2110
  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2111 2112
  taosHashCleanup(msg->userAuth.readTbs);
  taosHashCleanup(msg->userAuth.writeTbs);
X
Xiaoyu Wang 已提交
2113
  taosHashCleanup(msg->userAuth.useDbs);
dengyihao's avatar
dengyihao 已提交
2114

D
dapan1121 已提交
2115
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2116

D
dapan1121 已提交
2117 2118 2119
  CTG_RET(code);
}

D
dapan1121 已提交
2120
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2121
  int32_t             code = 0;
D
dapan1121 已提交
2122
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2123
  SCatalog           *pCtg = msg->pCtg;
dengyihao's avatar
dengyihao 已提交
2124
  SCtgDBCache        *dbCache = NULL;
D
dapan1121 已提交
2125 2126 2127 2128 2129

  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2130
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2131 2132 2133 2134 2135
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
2136 2137
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
2138
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
2139
  if (NULL == vgInfo) {
D
dapan1121 已提交
2140 2141 2142
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
2143 2144

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
2145
  if (NULL == pInfo) {
2146 2147 2148 2149 2150 2151 2152
    ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

  SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
  if (NULL == pInfo2) {
    ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
D
dapan1121 已提交
2153 2154 2155
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
2156 2157 2158 2159 2160
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
2161

D
dapan1121 已提交
2162
  pInfo->epSet = msg->epSet;
2163
  pInfo2->epSet = msg->epSet;
D
dapan1121 已提交
2164 2165 2166

_return:

D
dapan1121 已提交
2167
  if (code == TSDB_CODE_SUCCESS && dbCache) {
D
dapan1121 已提交
2168
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
2169 2170 2171
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2172

D
dapan1121 已提交
2173 2174 2175
  CTG_RET(code);
}

D
dapan1121 已提交
2176
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2177
  int32_t               code = 0;
D
dapan1121 已提交
2178
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2179 2180 2181 2182
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
2183 2184 2185 2186
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2187
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
2188 2189 2190 2191 2192 2193 2194 2195 2196

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2197

D
dapan1121 已提交
2198
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2199

D
dapan1121 已提交
2200 2201 2202 2203
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2204
  int32_t             code = 0;
D
dapan1121 已提交
2205
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2206 2207 2208
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
2209 2210 2211 2212
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2213
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2214
  if (NULL == dbCache) {
D
dapan1121 已提交
2215 2216 2217
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
2218
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
2219 2220
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
2221
  }
D
dapan1121 已提交
2222 2223 2224
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
2225

D
dapan1121 已提交
2226
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
2227 2228 2229 2230 2231 2232 2233

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2234

D
dapan1121 已提交
2235
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2236

D
dapan1121 已提交
2237 2238 2239
  CTG_RET(code);
}

D
dapan1121 已提交
2240
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2241
  int32_t            code = 0;
D
dapan1121 已提交
2242
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2243
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
2244

D
dapan1121 已提交
2245 2246
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
2247
  if (pCtg) {
D
dapan1121 已提交
2248 2249 2250 2251 2252
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
2253

D
dapan1121 已提交
2254 2255
    goto _return;
  }
D
dapan1121 已提交
2256 2257 2258 2259 2260 2261

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
2262 2263

_return:
D
dapan1121 已提交
2264 2265

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
2266

D
dapan1121 已提交
2267
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2268

D
dapan1121 已提交
2269 2270 2271
  CTG_RET(code);
}

2272 2273 2274 2275 2276 2277 2278 2279
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
  if (NULL == op || NULL == op->data) {
    return;
  }

  switch (op->opId) {
    case CTG_OP_UPDATE_VGROUP: {
      SCtgUpdateVgMsg *msg = op->data;
2280
      freeVgInfo(msg->dbInfo);
2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_META: {
      SCtgUpdateTbMetaMsg *msg = op->data;
      taosMemoryFreeClear(msg->pMeta->tbMeta);
      taosMemoryFreeClear(msg->pMeta);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_DROP_DB_CACHE:
    case CTG_OP_DROP_DB_VGROUP:
    case CTG_OP_DROP_STB_META:
    case CTG_OP_DROP_TB_META:
    case CTG_OP_UPDATE_VG_EPSET:
    case CTG_OP_DROP_TB_INDEX:
    case CTG_OP_CLEAR_CACHE: {
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_USER: {
      SCtgUpdateUserMsg *msg = op->data;
      taosHashCleanup(msg->userAuth.createdDbs);
      taosHashCleanup(msg->userAuth.readDbs);
      taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2306 2307
      taosHashCleanup(msg->userAuth.readTbs);
      taosHashCleanup(msg->userAuth.writeTbs);
X
Xiaoyu Wang 已提交
2308
      taosHashCleanup(msg->userAuth.useDbs);
2309 2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_INDEX: {
      SCtgUpdateTbIndexMsg *msg = op->data;
      if (msg->pIndex) {
        taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
        taosMemoryFreeClear(msg->pIndex);
      }
      taosMemoryFreeClear(op->data);
      break;
    }
    default: {
      qError("invalid cache op id:%d", op->opId);
      break;
    }
  }
}

D
dapan1121 已提交
2328
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2329 2330
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2331
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2332
  bool                stopQueue = false;
D
dapan1121 已提交
2333 2334 2335 2336 2337

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2338 2339 2340 2341 2342 2343
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
D
dapan1121 已提交
2344
          CTG_STAT_RT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2345
        } else {
2346
          ctgFreeCacheOperationData(op);
D
dapan1121 已提交
2347
          CTG_STAT_RT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2348
        }
dengyihao's avatar
dengyihao 已提交
2349

D
dapan1121 已提交
2350 2351
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2352
        } else {
D
dapan1121 已提交
2353
          taosMemoryFree(op);
D
dapan1121 已提交
2354
        }
D
dapan1121 已提交
2355
      }
D
dapan1121 已提交
2356 2357 2358

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2359

D
dapan1121 已提交
2360
      node = nodeNext;
D
dapan1121 已提交
2361 2362
    }

D
dapan1121 已提交
2363
    if (!stopQueue) {
D
dapan1121 已提交
2364 2365 2366 2367
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2368 2369 2370 2371 2372 2373
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2374
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2375
  setThreadName("catalog");
2376

D
dapan1121 已提交
2377 2378 2379 2380 2381 2382
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2383

2384
    if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
D
dapan1121 已提交
2385
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2386 2387 2388
      break;
    }

D
dapan1121 已提交
2389 2390 2391
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2392

D
dapan1121 已提交
2393
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2394

D
dapan1121 已提交
2395
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2396

D
dapan1121 已提交
2397
    if (operation->syncOp) {
D
dapan1121 已提交
2398
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2399 2400
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2401 2402
    }

D
dapan1121 已提交
2403
    CTG_STAT_RT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2404

D
dapan1121 已提交
2405
    ctgdShowCacheInfo();
D
dapan1121 已提交
2406
    ctgdShowStatInfo();
D
dapan1121 已提交
2407 2408 2409
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2410

D
dapan1121 已提交
2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2423

D
dapan1121 已提交
2424 2425 2426 2427
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2428
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2429
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2430 2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
2442
    *pTableMeta = NULL;
D
dapan1121 已提交
2443 2444 2445 2446 2447 2448 2449 2450 2451
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2452
#if 0
2453
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2491
      fetch.tbIdx = i;
D
dapan1121 已提交
2492 2493 2494 2495 2496 2497
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2498
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2499 2500 2501
  }

  if (NULL == ctx->pFetchs) {
2502
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2503 2504 2505 2506
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2507 2508
#endif

dengyihao's avatar
dengyihao 已提交
2509 2510 2511 2512 2513 2514 2515 2516 2517
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2518 2519 2520 2521 2522 2523 2524 2525
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2526
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2527
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2528

D
dapan1121 已提交
2529 2530 2531
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2532
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2533
      taosArrayPush(ctx->pResList, &(SMetaData){0});
D
dapan1121 已提交
2534 2535 2536 2537 2538 2539
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2540
    pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2541 2542 2543 2544

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2545
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2546
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2547
      CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
2548

D
dapan1121 已提交
2549 2550 2551 2552 2553
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
K
kailixu 已提交
2554
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
D
dapan1121 已提交
2555
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2556
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2557
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2558
      CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
2559

D
dapan1121 已提交
2560 2561 2562
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2563
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2564

D
dapan1121 已提交
2565
    CTG_META_HIT_INC(tbMeta->tableType);
X
Xiaoyu Wang 已提交
2566

D
dapan1121 已提交
2567 2568 2569 2570 2571 2572 2573
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2574 2575
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2576 2577 2578 2579
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2580
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2581 2582
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2583

D
dapan1121 已提交
2584
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2585

2586
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2587 2588
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2589
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2590

D
dapan1121 已提交
2591
      res.pRes = pTableMeta;
2592
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2593 2594 2595

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2596

D
dapan1121 已提交
2597 2598 2599 2600
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2601
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2602

2603
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2604 2605
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2606
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2607

D
dapan1121 已提交
2608
      res.pRes = pTableMeta;
2609
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2610

D
dapan1121 已提交
2611 2612
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2613

D
dapan1121 已提交
2614 2615 2616
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2617
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2618 2619
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2620

D
dapan1121 已提交
2621
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2622

2623
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2624 2625 2626 2627 2628 2629
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2630 2631
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2632
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2633
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2634
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2635 2636

      CTG_META_NHIT_INC();
D
dapan1121 已提交
2637 2638 2639 2640 2641 2642 2643
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2644

2645
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2646
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2647
      taosMemoryFreeClear(pTableMeta);
X
Xiaoyu Wang 已提交
2648

D
dapan1121 已提交
2649
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2650 2651 2652 2653 2654 2655 2656 2657
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2658
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2659 2660
      taosHashRelease(dbCache->tbCache, pCache);

2661
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2662
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2663 2664
      taosMemoryFreeClear(pTableMeta);

D
dapan1121 已提交
2665
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2666 2667
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2668 2669 2670

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2671
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2672 2673 2674 2675 2676
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2677
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2678
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2679 2680
      taosMemoryFreeClear(pTableMeta);

D
dapan1121 已提交
2681
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2682 2683
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2684

D
dapan1121 已提交
2685 2686
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2687
    if (NULL == pTableMeta) {
2688
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2689 2690
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2691

D
dapan1121 已提交
2692
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2693

2694
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2695 2696
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2697 2698
    CTG_META_HIT_INC(pTableMeta->tableType);

D
dapan1121 已提交
2699
    res.pRes = pTableMeta;
2700
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2701 2702 2703 2704 2705

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2706
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2707

D
dapan1121 已提交
2708 2709 2710
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2711
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2712
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2713
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2714 2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2742
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2743 2744 2745 2746
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2747
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2748 2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}