ctgCache.c 77.2 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
X
Xiaoyu Wang 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
    {"Cluster   ", CTG_CI_FLAG_LEVEL_GLOBAL},   // CTG_CI_CLUSTER
    {"Dnode     ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_DNODE,
    {"Qnode     ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_QNODE,
    {"DB        ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_DB,
    {"DbVgroup  ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_DB_VGROUP,
    {"DbCfg     ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_DB_CFG,
    {"DbInfo    ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_DB_INFO,
    {"StbMeta   ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_STABLE_META,
    {"NtbMeta   ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_NTABLE_META,
    {"CtbMeta   ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_CTABLE_META,
    {"SysTblMeta", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_SYSTABLE_META,
    {"OthTblMeta", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_OTHERTABLE_META,
    {"TblSMA    ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_TBL_SMA,
    {"TblCfg    ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_TBL_CFG,
    {"IndexInfo ", CTG_CI_FLAG_LEVEL_DB},       // CTG_CI_INDEX_INFO,
    {"User      ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_USER,
    {"UDF       ", CTG_CI_FLAG_LEVEL_CLUSTER},  // CTG_CI_UDF,
    {"SvrVer    ", CTG_CI_FLAG_LEVEL_CLUSTER}   // CTG_CI_SVR_VER,
D
dapan1121 已提交
53 54
};

D
dapan1121 已提交
55 56
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
57

D
dapan1121 已提交
58
  if (dbCache->deleted) {
D
dapan1121 已提交
59
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
60

dengyihao's avatar
dengyihao 已提交
61 62
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
63 64 65 66
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
67 68
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
69 70

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
71
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
72 73 74 75
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
76

D
dapan1121 已提交
77 78 79
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
80 81
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
82 83

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
84
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
85
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
86 87 88 89 90 91
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
92
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
93

dengyihao's avatar
dengyihao 已提交
94
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
95

dengyihao's avatar
dengyihao 已提交
96 97
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
D
dapan1121 已提交
98 99
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
100

dengyihao's avatar
dengyihao 已提交
101
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
102
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
103
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
104 105 106
    dbFName = p + 1;
  }

D
dapan1121 已提交
107 108 109 110 111 112 113
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
dengyihao's avatar
dengyihao 已提交
114

D
dapan1121 已提交
115 116
  if (NULL == dbCache) {
    *pCache = NULL;
D
dapan1121 已提交
117
    CTG_CACHE_NHIT_INC(CTG_CI_DB, 1);
D
dapan1121 已提交
118 119 120 121
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
122 123 124 125
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
126 127 128
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
129 130
    }

D
dapan1121 已提交
131
    *pCache = NULL;
D
dapan1121 已提交
132
    CTG_CACHE_NHIT_INC(CTG_CI_DB, 1);
D
dapan1121 已提交
133 134 135 136 137
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
D
dapan1121 已提交
138
  CTG_CACHE_HIT_INC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
139

D
dapan1121 已提交
140 141 142
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
143
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
144 145 146
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
147
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
148 149 150
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
151
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
152 153 154
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
155

dengyihao's avatar
dengyihao 已提交
156
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
157
  if (pCache && dbCache) {
D
dapan1121 已提交
158
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
159
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
160
  }
D
dapan1121 已提交
161

D
dapan1121 已提交
162 163
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
164
  }
D
dapan1121 已提交
165
}
D
dapan1121 已提交
166

dengyihao's avatar
dengyihao 已提交
167
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
168 169
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
170
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
171 172 173 174 175 176 177
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

178
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
179
  if (pCache && dbCache) {
180 181 182 183
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
    taosHashRelease(dbCache->tbCache, pCache);
  }

D
dapan1121 已提交
184 185 186 187
  if (dbCache) {
    ctgRUnlockVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }
188 189
}

dengyihao's avatar
dengyihao 已提交
190
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
191
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
192
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
193
  if (NULL == dbCache) {
D
dapan1121 已提交
194 195 196 197 198
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
199
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
200 201 202 203 204 205 206
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
207
  CTG_CACHE_HIT_INC(CTG_CI_DB_VGROUP, 1);
D
dapan1121 已提交
208 209

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
210

D
dapan1121 已提交
211 212 213 214 215 216 217 218 219 220
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
221
  CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
dengyihao's avatar
dengyihao 已提交
222

D
dapan1121 已提交
223 224 225
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
226
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
227
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
228
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
229 230 231 232 233
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
234

D
dapan1121 已提交
235
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
236 237 238
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
239 240
  }

D
dapan1121 已提交
241 242 243 244 245 246 247 248 249 250
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
251

D
dapan1121 已提交
252
  CTG_META_HIT_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
253 254 255 256 257 258 259

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
260
  CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
261

D
dapan1121 已提交
262 263 264
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
265 266
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb,
                                  SCtgTbCache **pTb) {
267 268
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
X
Xiaoyu Wang 已提交
269
  bool         vgInCache = false;
270 271 272 273

  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
D
dapan1121 已提交
274
    CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
275 276 277 278 279 280
    goto _return;
  }

  ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
  if (!vgInCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
D
dapan1121 已提交
281
    CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
282 283 284 285 286
    goto _return;
  }

  *pDb = dbCache;

D
dapan1121 已提交
287
  CTG_CACHE_HIT_INC(CTG_CI_DB_VGROUP, 1);
288 289 290 291 292 293

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);

  tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
  if (NULL == tbCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
D
dapan1121 已提交
294
    CTG_META_NHIT_INC();
295 296 297 298 299 300
    goto _return;
  }

  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (NULL == tbCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
D
dapan1121 已提交
301
    CTG_META_NHIT_INC();
302 303 304 305 306 307 308
    goto _return;
  }

  *pTb = tbCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);

D
dapan1121 已提交
309
  CTG_META_HIT_INC(tbCache->pMeta->tableType);
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333

  return TSDB_CODE_SUCCESS;

_return:

  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }

  if (vgInCache) {
    ctgRUnlockVgInfo(dbCache);
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pDb = NULL;
  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

334
/*
dengyihao's avatar
dengyihao 已提交
335 336 337
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
338 339 340 341 342
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
343 344

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
345
  if (NULL == stName) {
D
dapan1121 已提交
346
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
347 348 349 350 351
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
352
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
353 354 355 356
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

D
dapan1121 已提交
357 358
  taosHashRelease(dbCache->stbCache, stName);

D
dapan1121 已提交
359 360
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
361
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
362 363 364 365 366 367
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
368
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
369

D
dapan1121 已提交
370
  CTG_META_HIT_INC(pCache->pMeta->tableType, 1);
D
dapan1121 已提交
371 372 373 374 375 376 377

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
378
  CTG_META_NHIT_INC(1);
D
dapan1121 已提交
379 380 381

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
382

D
dapan1121 已提交
383 384
  return TSDB_CODE_SUCCESS;
}
385
*/
D
dapan1121 已提交
386

X
Xiaoyu Wang 已提交
387 388
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid,
                                   SCtgTbCache **pTb) {
D
dapan1121 已提交
389
  SCtgTbCache *pCache = NULL;
X
Xiaoyu Wang 已提交
390
  char        *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
  if (NULL == stName) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  taosHashRelease(dbCache->stbCache, stName);

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  *pTb = pCache;

  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);

D
dapan1121 已提交
415
  CTG_META_HIT_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
416 417 418 419 420 421 422

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
423
  CTG_META_NHIT_INC();
D
dapan1121 已提交
424 425 426 427 428 429

  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
430
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
431
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
432
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
433 434
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
435 436 437
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
438

D
dapan1121 已提交
439
  int32_t sz = 0;
D
dapan1121 已提交
440
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
441 442 443 444 445 446 447 448 449
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
450 451
  }

D
dapan1121 已提交
452 453 454 455
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
456

D
dapan1121 已提交
457
  CTG_CACHE_HIT_INC(CTG_CI_TBL_SMA, 1);
D
dapan1121 已提交
458 459 460 461 462 463 464

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
465
  CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1);
dengyihao's avatar
dengyihao 已提交
466

D
dapan1121 已提交
467 468 469
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
470
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
471
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
472
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
473 474 475
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
476

D
dapan1121 已提交
477 478 479 480 481
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
482
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
483

D
dapan1121 已提交
484 485 486
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
487 488
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta,
                      char *dbFName) {
D
dapan1121 已提交
489
  SCtgDBCache *dbCache = *pDb;
490
  SCtgTbCache *tbCache = *pTb;
X
Xiaoyu Wang 已提交
491
  STableMeta  *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
492 493 494 495
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
496

D
dapan1121 已提交
497
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
498 499 500
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
501
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
502 503 504
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
505

D
dapan1121 已提交
506
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
507 508
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
509 510

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
511

D
dapan1121 已提交
512 513 514
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
515
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
516 517
  }

D
dapan1121 已提交
518
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
519

X
Xiaoyu Wang 已提交
520
  // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
521

522 523 524
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
  taosHashRelease(dbCache->tbCache, tbCache);
  *pTb = NULL;
X
Xiaoyu Wang 已提交
525

dengyihao's avatar
dengyihao 已提交
526 527
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
528

529
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
D
dapan1121 已提交
530 531
  if (NULL == tbCache) {
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
532
    *pDb = NULL;
D
dapan1121 已提交
533 534 535
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
536

537 538
  *pTb = tbCache;

dengyihao's avatar
dengyihao 已提交
539 540 541
  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
542 543
    taosMemoryFreeClear(*pTableMeta);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
544 545
  }

D
dapan1121 已提交
546
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
547
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
548
  if (NULL == *pTableMeta) {
549
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
550 551
  }

D
dapan1121 已提交
552
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
553

554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  return TSDB_CODE_SUCCESS;
}

int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
576
  CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
577

D
dapan1121 已提交
578
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
579

D
dapan1121 已提交
580
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
581

D
dapan1121 已提交
582 583 584 585
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
586
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
587
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
588
  *pTableMeta = NULL;
dengyihao's avatar
dengyihao 已提交
589

D
dapan1121 已提交
590 591 592
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
593 594
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
595
  *sver = -1;
D
dapan1121 已提交
596
  *tver = -1;
D
dapan1121 已提交
597 598

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
599
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
600
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
601 602
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
603 604 605
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
606 607 608
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
609
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
610 611
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
612

D
dapan1121 已提交
613
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
614 615 616
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
617 618
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
619

D
dapan1121 已提交
620
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
621 622 623
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
624
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
625

X
Xiaoyu Wang 已提交
626
  // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
627 628 629 630
  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }
X
Xiaoyu Wang 已提交
631

D
dapan1121 已提交
632
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
633

634
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
D
dapan1121 已提交
635
  if (NULL == tbCache) {
X
Xiaoyu Wang 已提交
636
    // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
637
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
638 639
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
640 641

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
642
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
643
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
644
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
645 646 647
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
648
  size_t nameLen = 0;
D
dapan1121 已提交
649
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
650 651 652 653

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
654 655
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
656

D
dapan1121 已提交
657
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
658

dengyihao's avatar
dengyihao 已提交
659 660
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
661 662 663 664

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
665
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
666
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
667
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
668
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
669 670
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
671 672
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
673 674 675 676

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
677 678
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
679 680 681
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
682 683
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
684
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
685
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
686 687
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
688

D
dapan1121 已提交
689
  *pRes = NULL;
D
dapan1121 已提交
690

D
dapan1121 已提交
691 692 693
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
694 695 696
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
697
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
698

D
dapan1121 已提交
699
_return:
D
dapan1121 已提交
700

D
dapan1121 已提交
701
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
702

D
dapan1121 已提交
703
  CTG_RET(code);
D
dapan1121 已提交
704 705
}

706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728
int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName) {
  *stbName = NULL;

  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    return TSDB_CODE_SUCCESS;
  }  
  
  char *stb = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
  if (NULL == stb) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *stbName = taosStrdup(stb);

  taosHashRelease(dbCache->stbCache, stb);

  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
729
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
D
dapan1121 已提交
730
  if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
731
    *inCache = true;
D
dapan1121 已提交
732 733
    pRes->pRawRes->pass = true;
    ctgDebug("sysdb %s, pass", pReq->tbName.dbname);
734 735 736
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
737
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, pReq->user, strlen(pReq->user));
D
dapan1121 已提交
738
  if (NULL == pUser) {
D
dapan1121 已提交
739
    ctgDebug("user not in cache, user:%s", pReq->user);
D
dapan1121 已提交
740 741 742 743 744
    goto _return;
  }

  *inCache = true;

D
dapan1121 已提交
745
  ctgDebug("Got user from cache, user:%s", pReq->user);
D
dapan1121 已提交
746
  CTG_CACHE_HIT_INC(CTG_CI_USER, 1);
dengyihao's avatar
dengyihao 已提交
747

D
dapan1121 已提交
748 749 750
  SCtgAuthReq req = {0};
  req.pRawReq = pReq;
  req.onlyCache = true;
D
dapan1121 已提交
751 752

  CTG_LOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
753 754
  memcpy(&req.authInfo, &pUser->userAuth, sizeof(pUser->userAuth));
  int32_t code = ctgChkSetAuthRes(pCtg, &req, pRes);
D
dapan1121 已提交
755
  CTG_UNLOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
756
  CTG_ERR_JRET(code);
dengyihao's avatar
dengyihao 已提交
757

D
dapan1121 已提交
758 759
  if (pRes->metaNotExists) {
    goto _return;
D
dapan1121 已提交
760 761
  }

D
dapan1121 已提交
762
  CTG_RET(code);
D
dapan1121 已提交
763 764 765 766

_return:

  *inCache = false;
D
dapan1121 已提交
767
  CTG_CACHE_NHIT_INC(CTG_CI_USER, 1);
D
dapan1121 已提交
768
  ctgDebug("Get user from cache failed, user:%s, metaNotExists:%d, code:%d", pReq->user, pRes->metaNotExists, code);
dengyihao's avatar
dengyihao 已提交
769

D
dapan1121 已提交
770
  return code;
D
dapan1121 已提交
771 772
}

D
dapan1121 已提交
773
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
774
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
775

D
dapan1121 已提交
776 777 778
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
779
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
780

D
dapan1121 已提交
781 782
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
783
  *op = node->op;
D
dapan1121 已提交
784 785
}

dengyihao's avatar
dengyihao 已提交
786
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
787 788 789
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
790 791
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
792
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
793 794
  }

dengyihao's avatar
dengyihao 已提交
795 796
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
797 798 799
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
800

D
dapan1121 已提交
801
  node->op = operation;
D
dapan1121 已提交
802 803

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
804

D
dapan1121 已提交
805
  if (gCtgMgmt.queue.stopQueue) {
806 807 808 809
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
810

D
dapan1121 已提交
811 812
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
813 814 815

  gCtgMgmt.queue.stopQueue = operation->stopQueue;

D
dapan1121 已提交
816 817
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
818 819
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
820
  CTG_QUEUE_INC();
D
dapan1121 已提交
821
  CTG_STAT_RT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
822 823 824

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
825
  if (syncOp) {
826 827 828
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
829
    tsem_wait(&operation->rspSem);
830 831 832
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
833
    taosMemoryFree(operation);
D
dapan1121 已提交
834 835 836 837 838
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
839 840
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
841 842
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
843
  op->syncOp = true;
dengyihao's avatar
dengyihao 已提交
844

D
dapan1121 已提交
845
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
846
  if (NULL == msg) {
D
dapan1121 已提交
847
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
848
    taosMemoryFree(op);
D
dapan1121 已提交
849
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
850 851 852
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
853
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
854 855 856 857
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
858
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
859 860
  msg->dbId = dbId;

D
dapan1121 已提交
861
  op->data = msg;
D
dapan1121 已提交
862

D
dapan1121 已提交
863
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
864 865 866 867 868 869 870 871

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
872 873
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
874 875 876
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
877

D
dapan1121 已提交
878 879 880
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
881
    taosMemoryFree(op);
D
dapan1121 已提交
882
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
883 884 885
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
886
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
887 888 889 890
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
891
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
892

D
dapan1121 已提交
893
  op->data = msg;
D
dapan1121 已提交
894

D
dapan1121 已提交
895
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
896 897 898 899 900 901 902 903

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
904 905 906
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
907 908 909
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
910

D
dapan1121 已提交
911
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
912
  if (NULL == msg) {
D
dapan1121 已提交
913
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
914
    taosMemoryFree(op);
D
dapan1121 已提交
915
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
916 917 918
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
919 920
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
921 922 923
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
924
  op->data = msg;
D
dapan1121 已提交
925

D
dapan1121 已提交
926
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
927 928 929 930 931 932 933 934

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
935 936
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
937 938 939
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
940

D
dapan1121 已提交
941
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
942
  if (NULL == msg) {
D
dapan1121 已提交
943
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
944
    taosMemoryFree(op);
D
dapan1121 已提交
945
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
946 947 948
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
949 950
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
951 952
  msg->dbId = dbId;

D
dapan1121 已提交
953
  op->data = msg;
D
dapan1121 已提交
954

D
dapan1121 已提交
955
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
956 957 958 959 960 961 962 963

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
964 965
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
966 967 968
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
969

D
dapan1121 已提交
970 971 972
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
973
    taosMemoryFree(op);
974
    freeVgInfo(dbInfo);
D
dapan1121 已提交
975
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
976 977 978
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
979
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
980 981 982
    dbFName = p + 1;
  }

983 984 985 986 987 988
  code = ctgMakeVgArray(dbInfo);
  if (code) {
    taosMemoryFree(op);
    taosMemoryFree(msg);
    freeVgInfo(dbInfo);
    CTG_ERR_RET(code);
989 990
  }

D
dapan1121 已提交
991
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
992 993 994 995
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
996
  op->data = msg;
D
dapan1121 已提交
997

D
dapan1121 已提交
998
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
999 1000 1001 1002 1003

  return TSDB_CODE_SUCCESS;

_return:

1004
  freeVgInfo(dbInfo);
D
dapan1121 已提交
1005 1006 1007
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1008 1009
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1010 1011 1012
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1013

D
dapan1121 已提交
1014
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
1015
  if (NULL == msg) {
D
dapan1121 已提交
1016
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
1017
    taosMemoryFree(op);
D
dapan1121 已提交
1018
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1019 1020 1021
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
1022
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
1023 1024
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
1025 1026 1027
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1028
  msg->pMeta = output;
D
dapan1121 已提交
1029

D
dapan1121 已提交
1030
  op->data = msg;
D
dapan1121 已提交
1031

D
dapan1121 已提交
1032
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1033 1034

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1035

D
dapan1121 已提交
1036 1037
_return:

D
dapan1121 已提交
1038 1039 1040 1041 1042
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
1043 1044 1045
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1046 1047
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
1048 1049
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
1050

D
dapan1121 已提交
1051 1052 1053
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
1054
    taosMemoryFree(op);
D
dapan1121 已提交
1055
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1056 1057 1058
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1059
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
1060 1061 1062
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
1063
  op->data = msg;
D
dapan1121 已提交
1064

D
dapan1121 已提交
1065
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1066 1067

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1068

D
dapan1121 已提交
1069 1070 1071 1072 1073
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1074 1075
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1076 1077 1078
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1079

D
dapan1121 已提交
1080 1081 1082
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
1083
    taosMemoryFree(op);
D
dapan1121 已提交
1084
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1085 1086 1087 1088 1089
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
1090
  op->data = msg;
D
dapan1121 已提交
1091

D
dapan1121 已提交
1092
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1093

D
dapan1121 已提交
1094
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1095

D
dapan1121 已提交
1096 1097 1098
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
1099

D
dapan1121 已提交
1100 1101 1102
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1103 1104
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1105 1106 1107
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1108

D
dapan1121 已提交
1109 1110 1111
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
1112
    taosMemoryFree(op);
D
dapan1121 已提交
1113
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1114 1115 1116
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1117
  msg->pIndex = *pIndex;
D
dapan1121 已提交
1118 1119 1120 1121

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1122 1123

  *pIndex = NULL;
D
dapan1121 已提交
1124
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1125

D
dapan1121 已提交
1126 1127
_return:

D
dapan1121 已提交
1128
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
1129
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
1130

D
dapan1121 已提交
1131 1132 1133
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1134 1135
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1136 1137 1138
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1139

D
dapan1121 已提交
1140 1141 1142
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
1143
    taosMemoryFree(op);
D
dapan1121 已提交
1144
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1145 1146 1147 1148 1149 1150 1151 1152 1153
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1154

D
dapan1121 已提交
1155
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1156

D
dapan1121 已提交
1157 1158 1159 1160 1161
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1162 1163
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1164 1165 1166
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
1167
  op->stopQueue = stopQueue;
1168
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
1169

D
dapan1121 已提交
1170 1171 1172
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
1173
    taosMemoryFree(op);
D
dapan1121 已提交
1174
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1175 1176 1177
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1178
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
1179 1180 1181
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1182

D
dapan1121 已提交
1183
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1184

D
dapan1121 已提交
1185 1186 1187 1188 1189
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
1190 1191 1192 1193 1194 1195
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1196

D
dapan1121 已提交
1197 1198 1199
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1200
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1201 1202 1203
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1204

D
dapan1121 已提交
1205 1206 1207 1208 1209 1210 1211
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1212 1213
  int32_t       code = 0;

D
dapan1121 已提交
1214 1215 1216 1217
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1218 1219
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1220
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1221 1222 1223 1224
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1225
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1226
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1227 1228 1229 1230
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1231
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1232 1233 1234 1235 1236 1237 1238

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1239 1240
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1241 1242 1243
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1244
  int32_t       code = 0;
D
dapan1121 已提交
1245 1246 1247

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1248
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1249 1250 1251 1252
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1253 1254
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1255 1256 1257 1258 1259 1260 1261
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1262 1263
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1264 1265 1266 1267 1268
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1269
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1270 1271 1272 1273 1274 1275

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1276 1277
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1278 1279 1280 1281 1282 1283 1284 1285 1286 1287
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1288 1289
  int32_t       code = 0;

D
dapan1121 已提交
1290 1291
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1292
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1304
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1305 1306 1307 1308 1309
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1310
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1327 1328
  int32_t       code = 0;

D
dapan1121 已提交
1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1347
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1394 1395
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1396
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1397
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1398
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1399 1400
  }

dengyihao's avatar
dengyihao 已提交
1401 1402
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1403
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1404
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1405
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1406 1407 1408 1409 1410 1411 1412 1413
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1414

D
dapan1121 已提交
1415
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1416
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1417 1418
  }

D
dapan1121 已提交
1419
  CTG_CACHE_NUM_INC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
1420

D
dapan1121 已提交
1421
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
D
dapan1121 已提交
1422
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1423

dengyihao's avatar
dengyihao 已提交
1424
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1425

D
dapan1121 已提交
1426 1427
  if (!IS_SYS_DBNAME(dbFName)) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1428

D
dapan1121 已提交
1429 1430
    ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
  }
D
dapan1121 已提交
1431 1432 1433 1434 1435 1436 1437 1438 1439 1440

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1441
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1442 1443 1444
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1445

D
dapan1121 已提交
1446 1447 1448 1449
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1450

dengyihao's avatar
dengyihao 已提交
1451 1452 1453
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1454
    }
dengyihao's avatar
dengyihao 已提交
1455

D
dapan1121 已提交
1456
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1457 1458 1459
  }
}

dengyihao's avatar
dengyihao 已提交
1460
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1461
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1462 1463

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1464

D
dapan1121 已提交
1465
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1466

D
dapan1121 已提交
1467
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1468
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1469 1470
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1471 1472 1473
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1474
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1475 1476 1477 1478 1479 1480

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1481
  CTG_CACHE_NUM_DEC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
1482 1483
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1484 1485 1486
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1487 1488
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1489 1490
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1491

D
dapan1121 已提交
1492
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1493
    // TODO OPEN IT
D
dapan1121 已提交
1494 1495 1496 1497 1498 1499 1500 1501 1502 1503 1504 1505 1506 1507 1508 1509
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1510

D
dapan1121 已提交
1511 1512 1513 1514 1515 1516 1517
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1518

D
dapan1121 已提交
1519 1520 1521 1522 1523 1524 1525 1526 1527
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1528 1529
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1530 1531 1532 1533 1534 1535 1536 1537 1538
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1539

D
dapan1121 已提交
1540 1541
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1542

dengyihao's avatar
dengyihao 已提交
1543 1544
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1545

dengyihao's avatar
dengyihao 已提交
1546 1547
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1548

dengyihao's avatar
dengyihao 已提交
1549 1550
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1551

dengyihao's avatar
dengyihao 已提交
1552 1553
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1554 1555
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1556
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1557 1558 1559
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1560 1561 1562 1563 1564
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1565 1566 1567
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1568 1569
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1570 1571
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1572 1573
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1574

D
dapan1121 已提交
1575
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1576
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1577
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1578
      } else {
dengyihao's avatar
dengyihao 已提交
1579
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1580 1581 1582 1583
      }
    }
  }

D
dapan1121 已提交
1584 1585 1586 1587 1588
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
1589
      taosMemoryFree(meta);
D
dapan1121 已提交
1590
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1591
    }
dengyihao's avatar
dengyihao 已提交
1592

D
dapan1121 已提交
1593 1594
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1595
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1596 1597 1598
    if (orig) {
      CTG_META_NUM_DEC(origType);
    }
D
dapan1121 已提交
1599 1600
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1601
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1602 1603
  }

D
dapan1121 已提交
1604
  CTG_META_NUM_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
1605 1606 1607 1608 1609 1610 1611 1612

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1613
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1614
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1615
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1616 1617
  }

dengyihao's avatar
dengyihao 已提交
1618 1619
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1620

D
dapan1121 已提交
1621 1622 1623
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1624

D
dapan1121 已提交
1625 1626 1627
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1628
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1629
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1630
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1631
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1632
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1633 1634 1635
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1636 1637 1638
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1639 1640 1641
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1642

D
dapan1121 已提交
1643 1644
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1645 1646
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1647
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1648 1649
    }

D
dapan1121 已提交
1650
    CTG_DB_NUM_INC(CTG_CI_TBL_SMA);
X
Xiaoyu Wang 已提交
1651

D
dapan1121 已提交
1652
    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1653 1654
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1655

D
dapan1121 已提交
1656
    if (suid) {
D
dapan1121 已提交
1657
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1658
    }
dengyihao's avatar
dengyihao 已提交
1659

D
dapan1121 已提交
1660 1661 1662
    return TSDB_CODE_SUCCESS;
  }

1663 1664
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1665
  if (pCache->pIndex) {
D
dapan1121 已提交
1666 1667 1668
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1669 1670 1671 1672 1673
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1674 1675
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1676 1677
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1678 1679
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1680

D
dapan1121 已提交
1681 1682 1683
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1684

D
dapan1121 已提交
1685 1686 1687
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1688 1689 1690 1691
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1692
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1693 1694 1695
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1696 1697

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1698

D
dapan1121 已提交
1699 1700 1701 1702 1703 1704
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1705
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1706
  SCatalog *pCtg = NULL;
1707

dengyihao's avatar
dengyihao 已提交
1708
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1709
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1710
    pCtg = *(SCatalog **)pIter;
1711 1712

    if (pCtg) {
D
dapan1121 已提交
1713 1714 1715 1716 1717 1718 1719 1720
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1721
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1722

dengyihao's avatar
dengyihao 已提交
1723
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1724
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1725
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1726 1727 1728

    if (pCtg) {
      ctgFreeHandle(pCtg);
1729 1730 1731 1732 1733 1734 1735 1736
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

X
Xiaoyu Wang 已提交
1737 1738 1739
int32_t ctgVgInfoIdComp(void const *lp, void const *rp) {
  int32_t     *key = (int32_t *)lp;
  SVgroupInfo *pVg = (SVgroupInfo *)rp;
1740 1741 1742 1743 1744 1745 1746 1747 1748 1749

  if (*key < pVg->vgId) {
    return -1;
  } else if (*key > pVg->vgId) {
    return 1;
  }

  return 0;
}

D
dapan1121 已提交
1750
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1751
  int32_t          code = 0;
D
dapan1121 已提交
1752
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1753 1754 1755 1756
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1757
  if (pCtg->stopUpdate || NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1758
    goto _return;
D
dapan1121 已提交
1759
  }
dengyihao's avatar
dengyihao 已提交
1760

D
dapan1121 已提交
1761
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1762
    ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dengyihao's avatar
dengyihao 已提交
1763
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1764
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1765 1766
  }

dengyihao's avatar
dengyihao 已提交
1767
  bool         newAdded = false;
dengyihao's avatar
dengyihao 已提交
1768 1769
  SDbVgVersion vgVersion = {
      .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
D
dapan1121 已提交
1770 1771

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1772
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1773
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1774
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1775
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1776 1777 1778
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1779
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1780

D
dapan1121 已提交
1781 1782
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1783

D
dapan1121 已提交
1784
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
dengyihao's avatar
dengyihao 已提交
1785 1786
      ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion,
               vgInfo->vgVersion);
D
dapan1121 已提交
1787
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1788

D
dapan1121 已提交
1789
      goto _return;
D
dapan1121 已提交
1790 1791
    }

dengyihao's avatar
dengyihao 已提交
1792 1793 1794 1795
    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable &&
        dbInfo->stateTs == vgInfo->stateTs) {
      ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName,
               dbInfo->vgVersion, dbInfo->numOfTable, dbInfo->stateTs);
D
dapan1121 已提交
1796
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1797

D
dapan1121 已提交
1798
      goto _return;
D
dapan1121 已提交
1799 1800
    }

1801
    freeVgInfo(vgInfo);
D
dapan1121 已提交
1802
    CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1803 1804 1805 1806
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;
D
dapan1121 已提交
1807
  CTG_DB_NUM_SET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1808

dengyihao's avatar
dengyihao 已提交
1809 1810
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
           vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
D
dapan1121 已提交
1811 1812 1813 1814 1815

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

X
Xiaoyu Wang 已提交
1816 1817 1818 1819
  // if (!IS_SYS_DBNAME(dbFName)) {
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                 ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
1820
  //}
D
dapan1121 已提交
1821 1822 1823

_return:

1824
  freeVgInfo(msg->dbInfo);
D
dapan1121 已提交
1825
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1826

D
dapan1121 已提交
1827 1828 1829
  CTG_RET(code);
}

D
dapan1121 已提交
1830
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1831
  int32_t        code = 0;
D
dapan1121 已提交
1832
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1833
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1834

D
dapan1121 已提交
1835 1836 1837 1838
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1839 1840 1841 1842 1843
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1844

1845
  if (msg->dbId && dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1846 1847
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1848 1849
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1850

D
dapan1121 已提交
1851 1852 1853 1854 1855
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1856

D
dapan1121 已提交
1857 1858 1859
  CTG_RET(code);
}

D
dapan1121 已提交
1860
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1861
  int32_t              code = 0;
D
dapan1121 已提交
1862
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1863
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1864

D
dapan1121 已提交
1865 1866 1867 1868
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1869 1870 1871 1872 1873
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1874

dengyihao's avatar
dengyihao 已提交
1875
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1876

1877
  freeVgInfo(dbCache->vgCache.vgInfo);
D
dapan1121 已提交
1878
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1879

D
dapan1121 已提交
1880
  CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1881 1882
  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1883
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1884 1885 1886 1887

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1888

D
dapan1121 已提交
1889 1890 1891
  CTG_RET(code);
}

D
dapan1121 已提交
1892
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1893
  int32_t              code = 0;
D
dapan1121 已提交
1894
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1895 1896 1897
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1898

D
dapan1121 已提交
1899 1900 1901
  if (pCtg->stopUpdate) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1902

D
dapan1121 已提交
1903 1904
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1905 1906 1907
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1908 1909
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1910
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1911 1912
  }

D
dapan1121 已提交
1913
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1914
  if (NULL == dbCache) {
D
dapan1121 已提交
1915
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1916 1917 1918
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1919 1920
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1921
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1922
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1923
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1924 1925
  }

D
dapan1121 已提交
1926
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1927
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1928 1929 1930 1931
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1932 1933
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1934 1935 1936 1937
  }

_return:

D
dapan1121 已提交
1938 1939
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1940

D
dapan1121 已提交
1941
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1942

D
dapan1121 已提交
1943 1944 1945
  CTG_RET(code);
}

D
dapan1121 已提交
1946
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1947
  int32_t             code = 0;
D
dapan1121 已提交
1948
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1949
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1950
  int32_t             tblType = 0;
D
dapan1121 已提交
1951

D
dapan1121 已提交
1952 1953 1954 1955
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1956 1957 1958
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1959
    goto _return;
D
dapan1121 已提交
1960 1961 1962
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1963
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1964
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1965
    goto _return;
D
dapan1121 已提交
1966
  }
dengyihao's avatar
dengyihao 已提交
1967

D
dapan1121 已提交
1968
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1969 1970
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1971 1972
  }

dengyihao's avatar
dengyihao 已提交
1973
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1974 1975 1976 1977 1978 1979
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
D
dapan1121 已提交
1980
  tblType = pTbCache->pMeta->tableType;
D
dapan1121 已提交
1981 1982 1983
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1984 1985
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1986
  } else {
D
dapan1121 已提交
1987
    CTG_META_NUM_DEC(tblType);
D
dapan1121 已提交
1988
  }
dengyihao's avatar
dengyihao 已提交
1989 1990

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1991 1992

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1993 1994 1995

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1996 1997 1998
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1999

D
dapan1121 已提交
2000 2001 2002
  CTG_RET(code);
}

D
dapan1121 已提交
2003
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2004
  int32_t             code = 0;
D
dapan1121 已提交
2005
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2006
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
2007
  int32_t             tblType = 0;
D
dapan1121 已提交
2008

D
dapan1121 已提交
2009 2010 2011 2012
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2013 2014 2015
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
2016
    goto _return;
D
dapan1121 已提交
2017 2018 2019
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
2020 2021
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
2022
    goto _return;
D
dapan1121 已提交
2023 2024
  }

dengyihao's avatar
dengyihao 已提交
2025
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
2026 2027 2028 2029 2030 2031
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
D
dapan1121 已提交
2032
  tblType = pTbCache->pMeta->tableType;
D
dapan1121 已提交
2033 2034
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2035

D
dapan1121 已提交
2036 2037 2038
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
2039
  } else {
D
dapan1121 已提交
2040
    CTG_META_NUM_DEC(tblType);
D
dapan1121 已提交
2041 2042
  }

D
dapan1121 已提交
2043
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
2044 2045 2046 2047 2048 2049 2050 2051

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
2052
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2053
  int32_t            code = 0;
D
dapan1121 已提交
2054
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2055 2056
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
2057 2058 2059 2060
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2061 2062 2063 2064
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

D
dapan1121 已提交
2065
    memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
D
dapan1121 已提交
2066 2067 2068 2069 2070 2071 2072 2073

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

D
dapan1121 已提交
2074 2075
    CTG_CACHE_NUM_INC(CTG_CI_USER, 1);

D
dapan1121 已提交
2076 2077 2078 2079 2080
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_WRITE, &pUser->lock);

D
dapan1121 已提交
2081 2082
  taosHashCleanup(pUser->userAuth.createdDbs);
  pUser->userAuth.createdDbs = msg->userAuth.createdDbs;
D
dapan1121 已提交
2083 2084
  msg->userAuth.createdDbs = NULL;

D
dapan1121 已提交
2085 2086
  taosHashCleanup(pUser->userAuth.readDbs);
  pUser->userAuth.readDbs = msg->userAuth.readDbs;
D
dapan1121 已提交
2087 2088
  msg->userAuth.readDbs = NULL;

D
dapan1121 已提交
2089 2090
  taosHashCleanup(pUser->userAuth.writeDbs);
  pUser->userAuth.writeDbs = msg->userAuth.writeDbs;
D
dapan1121 已提交
2091 2092
  msg->userAuth.writeDbs = NULL;

X
Xiaoyu Wang 已提交
2093 2094 2095 2096 2097 2098 2099 2100
  taosHashCleanup(pUser->userAuth.readTbs);
  pUser->userAuth.readTbs = msg->userAuth.readTbs;
  msg->userAuth.readTbs = NULL;

  taosHashCleanup(pUser->userAuth.writeTbs);
  pUser->userAuth.writeTbs = msg->userAuth.writeTbs;
  msg->userAuth.writeTbs = NULL;

X
Xiaoyu Wang 已提交
2101 2102 2103 2104
  taosHashCleanup(pUser->userAuth.useDbs);
  pUser->userAuth.useDbs = msg->userAuth.useDbs;
  msg->userAuth.useDbs = NULL;

D
dapan1121 已提交
2105 2106 2107 2108 2109 2110 2111
  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2112 2113
  taosHashCleanup(msg->userAuth.readTbs);
  taosHashCleanup(msg->userAuth.writeTbs);
X
Xiaoyu Wang 已提交
2114
  taosHashCleanup(msg->userAuth.useDbs);
dengyihao's avatar
dengyihao 已提交
2115

D
dapan1121 已提交
2116
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2117

D
dapan1121 已提交
2118 2119 2120
  CTG_RET(code);
}

D
dapan1121 已提交
2121
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2122
  int32_t             code = 0;
D
dapan1121 已提交
2123
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2124
  SCatalog           *pCtg = msg->pCtg;
dengyihao's avatar
dengyihao 已提交
2125
  SCtgDBCache        *dbCache = NULL;
D
dapan1121 已提交
2126 2127 2128 2129 2130

  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2131
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2132 2133 2134 2135 2136
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
2137 2138
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
2139
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
2140
  if (NULL == vgInfo) {
D
dapan1121 已提交
2141 2142 2143
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
2144 2145

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
2146
  if (NULL == pInfo) {
2147 2148 2149 2150 2151 2152 2153
    ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

  SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
  if (NULL == pInfo2) {
    ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
D
dapan1121 已提交
2154 2155 2156
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
2157 2158 2159 2160 2161
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
2162

D
dapan1121 已提交
2163
  pInfo->epSet = msg->epSet;
2164
  pInfo2->epSet = msg->epSet;
D
dapan1121 已提交
2165 2166 2167

_return:

D
dapan1121 已提交
2168
  if (code == TSDB_CODE_SUCCESS && dbCache) {
D
dapan1121 已提交
2169
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
2170 2171 2172
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2173

D
dapan1121 已提交
2174 2175 2176
  CTG_RET(code);
}

D
dapan1121 已提交
2177
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2178
  int32_t               code = 0;
D
dapan1121 已提交
2179
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2180 2181 2182 2183
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
2184 2185 2186 2187
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2188
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
2189 2190 2191 2192 2193 2194 2195 2196 2197

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2198

D
dapan1121 已提交
2199
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2200

D
dapan1121 已提交
2201 2202 2203 2204
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2205
  int32_t             code = 0;
D
dapan1121 已提交
2206
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2207 2208 2209
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
2210 2211 2212 2213
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2214
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2215
  if (NULL == dbCache) {
D
dapan1121 已提交
2216 2217 2218
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
2219
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
2220 2221
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
2222
  }
D
dapan1121 已提交
2223 2224 2225
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
2226

D
dapan1121 已提交
2227
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
2228 2229 2230 2231 2232 2233 2234

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2235

D
dapan1121 已提交
2236
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2237

D
dapan1121 已提交
2238 2239 2240
  CTG_RET(code);
}

D
dapan1121 已提交
2241
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2242
  int32_t            code = 0;
D
dapan1121 已提交
2243
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2244
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
2245

D
dapan1121 已提交
2246 2247
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
2248
  if (pCtg) {
D
dapan1121 已提交
2249 2250 2251 2252 2253
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
2254

D
dapan1121 已提交
2255 2256
    goto _return;
  }
D
dapan1121 已提交
2257 2258 2259 2260 2261 2262

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
2263 2264

_return:
D
dapan1121 已提交
2265 2266

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
2267

D
dapan1121 已提交
2268
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2269

D
dapan1121 已提交
2270 2271 2272
  CTG_RET(code);
}

2273 2274 2275 2276 2277 2278 2279 2280
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
  if (NULL == op || NULL == op->data) {
    return;
  }

  switch (op->opId) {
    case CTG_OP_UPDATE_VGROUP: {
      SCtgUpdateVgMsg *msg = op->data;
2281
      freeVgInfo(msg->dbInfo);
2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_META: {
      SCtgUpdateTbMetaMsg *msg = op->data;
      taosMemoryFreeClear(msg->pMeta->tbMeta);
      taosMemoryFreeClear(msg->pMeta);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_DROP_DB_CACHE:
    case CTG_OP_DROP_DB_VGROUP:
    case CTG_OP_DROP_STB_META:
    case CTG_OP_DROP_TB_META:
    case CTG_OP_UPDATE_VG_EPSET:
    case CTG_OP_DROP_TB_INDEX:
    case CTG_OP_CLEAR_CACHE: {
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_USER: {
      SCtgUpdateUserMsg *msg = op->data;
      taosHashCleanup(msg->userAuth.createdDbs);
      taosHashCleanup(msg->userAuth.readDbs);
      taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2307 2308
      taosHashCleanup(msg->userAuth.readTbs);
      taosHashCleanup(msg->userAuth.writeTbs);
X
Xiaoyu Wang 已提交
2309
      taosHashCleanup(msg->userAuth.useDbs);
2310 2311 2312 2313 2314 2315 2316 2317 2318 2319 2320 2321 2322 2323 2324 2325 2326 2327 2328
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_INDEX: {
      SCtgUpdateTbIndexMsg *msg = op->data;
      if (msg->pIndex) {
        taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
        taosMemoryFreeClear(msg->pIndex);
      }
      taosMemoryFreeClear(op->data);
      break;
    }
    default: {
      qError("invalid cache op id:%d", op->opId);
      break;
    }
  }
}

D
dapan1121 已提交
2329
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2330 2331
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2332
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2333
  bool                stopQueue = false;
D
dapan1121 已提交
2334 2335 2336 2337 2338

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2339 2340 2341 2342 2343 2344
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
D
dapan1121 已提交
2345
          CTG_STAT_RT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2346
        } else {
2347
          ctgFreeCacheOperationData(op);
D
dapan1121 已提交
2348
          CTG_STAT_RT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2349
        }
dengyihao's avatar
dengyihao 已提交
2350

D
dapan1121 已提交
2351 2352
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2353
        } else {
D
dapan1121 已提交
2354
          taosMemoryFree(op);
D
dapan1121 已提交
2355
        }
D
dapan1121 已提交
2356
      }
D
dapan1121 已提交
2357 2358 2359

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2360

D
dapan1121 已提交
2361
      node = nodeNext;
D
dapan1121 已提交
2362 2363
    }

D
dapan1121 已提交
2364
    if (!stopQueue) {
D
dapan1121 已提交
2365 2366 2367 2368
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2369 2370 2371 2372 2373 2374
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2375
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2376
  setThreadName("catalog");
2377

D
dapan1121 已提交
2378 2379 2380 2381 2382 2383
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2384

2385
    if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
D
dapan1121 已提交
2386
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2387 2388 2389
      break;
    }

D
dapan1121 已提交
2390 2391 2392
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2393

D
dapan1121 已提交
2394
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2395

D
dapan1121 已提交
2396
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2397

D
dapan1121 已提交
2398
    if (operation->syncOp) {
D
dapan1121 已提交
2399
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2400 2401
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2402 2403
    }

D
dapan1121 已提交
2404
    CTG_STAT_RT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2405

D
dapan1121 已提交
2406
    ctgdShowCacheInfo();
D
dapan1121 已提交
2407
    ctgdShowStatInfo();
D
dapan1121 已提交
2408 2409 2410
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2411

D
dapan1121 已提交
2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2424

D
dapan1121 已提交
2425 2426 2427 2428
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2429
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2430
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2431 2432 2433 2434 2435 2436 2437 2438 2439 2440 2441 2442
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
2443
    *pTableMeta = NULL;
D
dapan1121 已提交
2444 2445 2446 2447 2448 2449 2450 2451 2452
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2453
#if 0
2454
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471 2472 2473 2474 2475 2476 2477 2478 2479 2480 2481 2482 2483 2484 2485 2486 2487 2488 2489 2490 2491
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2492
      fetch.tbIdx = i;
D
dapan1121 已提交
2493 2494 2495 2496 2497 2498
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2499
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2500 2501 2502
  }

  if (NULL == ctx->pFetchs) {
2503
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2504 2505 2506 2507
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2508 2509
#endif

dengyihao's avatar
dengyihao 已提交
2510 2511 2512 2513 2514 2515 2516 2517 2518
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2519 2520 2521 2522 2523 2524 2525 2526
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2527
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2528
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2529

D
dapan1121 已提交
2530 2531 2532
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2533
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2534
      taosArrayPush(ctx->pResList, &(SMetaData){0});
D
dapan1121 已提交
2535 2536 2537 2538 2539 2540
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2541
    pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2542 2543 2544 2545

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2546
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2547
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2548
      CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
2549

D
dapan1121 已提交
2550 2551 2552 2553 2554
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
K
kailixu 已提交
2555
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
D
dapan1121 已提交
2556
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2557
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2558
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2559
      CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
2560

D
dapan1121 已提交
2561 2562 2563
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2564
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2565

D
dapan1121 已提交
2566
    CTG_META_HIT_INC(tbMeta->tableType);
X
Xiaoyu Wang 已提交
2567

D
dapan1121 已提交
2568 2569 2570 2571 2572 2573 2574
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2575 2576
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2577 2578 2579 2580
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2581
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2582 2583
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2584

D
dapan1121 已提交
2585
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2586

2587
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2588 2589
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2590
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2591

D
dapan1121 已提交
2592
      res.pRes = pTableMeta;
2593
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2594 2595 2596

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2597

D
dapan1121 已提交
2598 2599 2600 2601
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2602
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2603

2604
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2605 2606
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2607
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2608

D
dapan1121 已提交
2609
      res.pRes = pTableMeta;
2610
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2611

D
dapan1121 已提交
2612 2613
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2614

D
dapan1121 已提交
2615 2616 2617
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2618
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2619 2620
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2621

D
dapan1121 已提交
2622
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2623

2624
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2625 2626 2627 2628 2629 2630
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2631 2632
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2633
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2634
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2635
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2636 2637

      CTG_META_NHIT_INC();
D
dapan1121 已提交
2638 2639 2640 2641 2642 2643 2644
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2645

2646
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2647
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2648
      taosMemoryFreeClear(pTableMeta);
X
Xiaoyu Wang 已提交
2649

D
dapan1121 已提交
2650
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2651 2652 2653 2654 2655 2656 2657 2658
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2659
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2660 2661
      taosHashRelease(dbCache->tbCache, pCache);

2662
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2663
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2664 2665
      taosMemoryFreeClear(pTableMeta);

D
dapan1121 已提交
2666
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2667 2668
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2669 2670 2671

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2672
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2673 2674 2675 2676 2677
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2678
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2679
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2680 2681
      taosMemoryFreeClear(pTableMeta);

D
dapan1121 已提交
2682
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2683 2684
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2685

D
dapan1121 已提交
2686 2687
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2688
    if (NULL == pTableMeta) {
2689
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2690 2691
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2692

D
dapan1121 已提交
2693
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2694

2695
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2696 2697
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2698 2699
    CTG_META_HIT_INC(pTableMeta->tableType);

D
dapan1121 已提交
2700
    res.pRes = pTableMeta;
2701
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2702 2703 2704 2705 2706

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2707
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2708

D
dapan1121 已提交
2709 2710 2711
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2712
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2713
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2714
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2715 2716 2717 2718 2719 2720 2721 2722 2723 2724 2725 2726 2727 2728 2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2743
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2744 2745 2746 2747
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2748
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2749 2750 2751 2752 2753 2754 2755 2756 2757 2758 2759 2760 2761 2762 2763 2764 2765 2766 2767 2768 2769 2770 2771 2772 2773 2774
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}