ctgCache.c 76.2 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
    {"Cluster   ", CTG_CI_FLAG_LEVEL_GLOBAL},  //CTG_CI_CLUSTER
    {"Dnode     ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DNODE,
    {"Qnode     ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_QNODE,
    {"DB        ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_DB,
    {"DbVgroup  ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_DB_VGROUP,
    {"DbCfg     ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_DB_CFG,
    {"DbInfo    ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_DB_INFO,
    {"StbMeta   ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_STABLE_META,
    {"NtbMeta   ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_NTABLE_META,
    {"CtbMeta   ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_CTABLE_META,
    {"SysTblMeta", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_SYSTABLE_META,
    {"OthTblMeta", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_OTHERTABLE_META,
    {"TblSMA    ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_TBL_SMA,
    {"TblCfg    ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_TBL_CFG,
    {"IndexInfo ", CTG_CI_FLAG_LEVEL_DB},      //CTG_CI_INDEX_INFO,
    {"User      ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_USER,
    {"UDF       ", CTG_CI_FLAG_LEVEL_CLUSTER}, //CTG_CI_UDF,
    {"SvrVer    ", CTG_CI_FLAG_LEVEL_CLUSTER}  //CTG_CI_SVR_VER,
};


D
dapan1121 已提交
56 57
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
58

D
dapan1121 已提交
59
  if (dbCache->deleted) {
D
dapan1121 已提交
60
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
61

dengyihao's avatar
dengyihao 已提交
62 63
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
64 65 66 67
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
68 69
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
70 71

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
72
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
73 74 75 76
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
77

D
dapan1121 已提交
78 79 80
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
81 82
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
83 84

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
85
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
86
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
87 88 89 90 91 92
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
93
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
94

dengyihao's avatar
dengyihao 已提交
95
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
96

dengyihao's avatar
dengyihao 已提交
97 98
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
D
dapan1121 已提交
99 100
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
101

dengyihao's avatar
dengyihao 已提交
102
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
103
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
104
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
105 106 107
    dbFName = p + 1;
  }

D
dapan1121 已提交
108 109 110 111 112 113 114
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
dengyihao's avatar
dengyihao 已提交
115

D
dapan1121 已提交
116 117
  if (NULL == dbCache) {
    *pCache = NULL;
D
dapan1121 已提交
118
    CTG_CACHE_NHIT_INC(CTG_CI_DB, 1);
D
dapan1121 已提交
119 120 121 122
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
123 124 125 126
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
127 128 129
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
130 131
    }

D
dapan1121 已提交
132
    *pCache = NULL;
D
dapan1121 已提交
133
    CTG_CACHE_NHIT_INC(CTG_CI_DB, 1);
D
dapan1121 已提交
134 135 136 137 138
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
D
dapan1121 已提交
139
  CTG_CACHE_HIT_INC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
140

D
dapan1121 已提交
141 142 143
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
144
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
145 146 147
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
148
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
149 150 151
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
152
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
153 154 155
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
156

dengyihao's avatar
dengyihao 已提交
157
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
158
  if (pCache && dbCache) {
D
dapan1121 已提交
159
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
160
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
161
  }
D
dapan1121 已提交
162

D
dapan1121 已提交
163 164
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
165
  }
D
dapan1121 已提交
166
}
D
dapan1121 已提交
167

dengyihao's avatar
dengyihao 已提交
168
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
169 170
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
171
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
172 173 174 175 176 177 178
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

179
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
180
  if (pCache && dbCache) {
181 182 183 184
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
    taosHashRelease(dbCache->tbCache, pCache);
  }

D
dapan1121 已提交
185 186 187 188
  if (dbCache) {
    ctgRUnlockVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }
189 190
}

dengyihao's avatar
dengyihao 已提交
191
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
192
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
193
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
194
  if (NULL == dbCache) {
D
dapan1121 已提交
195 196 197 198 199
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
200
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
201 202 203 204 205 206 207
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
208
  CTG_CACHE_HIT_INC(CTG_CI_DB_VGROUP, 1);
D
dapan1121 已提交
209 210

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
211

D
dapan1121 已提交
212 213 214 215 216 217 218 219 220 221
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
222
  CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
dengyihao's avatar
dengyihao 已提交
223

D
dapan1121 已提交
224 225 226
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
227
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
228
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
229
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
230 231 232 233 234
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
235

D
dapan1121 已提交
236
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
237 238 239
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
240 241
  }

D
dapan1121 已提交
242 243 244 245 246 247 248 249 250 251
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
252

D
dapan1121 已提交
253
  CTG_META_HIT_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
254 255 256 257 258 259 260

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
261
  CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
262

D
dapan1121 已提交
263 264 265
  return TSDB_CODE_SUCCESS;
}

266 267 268
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
269
  bool vgInCache = false;
270 271 272 273

  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
D
dapan1121 已提交
274
    CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
275 276 277 278 279 280
    goto _return;
  }

  ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
  if (!vgInCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
D
dapan1121 已提交
281
    CTG_CACHE_NHIT_INC(CTG_CI_DB_VGROUP, 1);
282 283 284 285 286
    goto _return;
  }

  *pDb = dbCache;

D
dapan1121 已提交
287
  CTG_CACHE_HIT_INC(CTG_CI_DB_VGROUP, 1);
288 289 290 291 292 293

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);

  tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
  if (NULL == tbCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
D
dapan1121 已提交
294
    CTG_META_NHIT_INC();
295 296 297 298 299 300
    goto _return;
  }

  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (NULL == tbCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
D
dapan1121 已提交
301
    CTG_META_NHIT_INC();
302 303 304 305 306 307 308
    goto _return;
  }

  *pTb = tbCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);

D
dapan1121 已提交
309
  CTG_META_HIT_INC(tbCache->pMeta->tableType);
310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334

  return TSDB_CODE_SUCCESS;

_return:

  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }

  if (vgInCache) {
    ctgRUnlockVgInfo(dbCache);
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pDb = NULL;
  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}


335
/*
dengyihao's avatar
dengyihao 已提交
336 337 338
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
339 340 341 342 343
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
344 345

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
346
  if (NULL == stName) {
D
dapan1121 已提交
347
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
348 349 350 351 352
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
353
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
354 355 356 357
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

D
dapan1121 已提交
358 359
  taosHashRelease(dbCache->stbCache, stName);

D
dapan1121 已提交
360 361
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
362
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
363 364 365 366 367 368
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
369
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
370

D
dapan1121 已提交
371
  CTG_META_HIT_INC(pCache->pMeta->tableType, 1);
D
dapan1121 已提交
372 373 374 375 376 377 378

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
379
  CTG_META_NHIT_INC(1);
D
dapan1121 已提交
380 381 382

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
383

D
dapan1121 已提交
384 385
  return TSDB_CODE_SUCCESS;
}
386
*/
D
dapan1121 已提交
387

388
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
D
dapan1121 已提交
389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
  SCtgTbCache *pCache = NULL;
  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
  if (NULL == stName) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  taosHashRelease(dbCache->stbCache, stName);

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  *pTb = pCache;

  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);

D
dapan1121 已提交
415
  CTG_META_HIT_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
416 417 418 419 420 421 422

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
423
  CTG_META_NHIT_INC();
D
dapan1121 已提交
424 425 426 427 428 429

  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
430
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
431
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
432
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
433 434
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
435 436 437
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
438

D
dapan1121 已提交
439
  int32_t sz = 0;
D
dapan1121 已提交
440
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
441 442 443 444 445 446 447 448 449
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
450 451
  }

D
dapan1121 已提交
452 453 454 455
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
456

D
dapan1121 已提交
457
  CTG_CACHE_HIT_INC(CTG_CI_TBL_SMA, 1);
D
dapan1121 已提交
458 459 460 461 462 463 464

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
465
  CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1);
dengyihao's avatar
dengyihao 已提交
466

D
dapan1121 已提交
467 468 469
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
470
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
471
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
472
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
473 474 475
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
476

D
dapan1121 已提交
477 478 479 480 481
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
482
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
483

D
dapan1121 已提交
484 485 486
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
487 488
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) {
  SCtgDBCache *dbCache = *pDb;
489
  SCtgTbCache *tbCache = *pTb;
dengyihao's avatar
dengyihao 已提交
490
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
491 492 493 494
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
495

D
dapan1121 已提交
496
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
497 498 499
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
500
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
501 502 503
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
504

D
dapan1121 已提交
505
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
506 507
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
508 509

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
510

D
dapan1121 已提交
511 512 513
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
514
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
515 516
  }

D
dapan1121 已提交
517
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
518

D
dapan1121 已提交
519 520
  //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

521 522 523
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
  taosHashRelease(dbCache->tbCache, tbCache);
  *pTb = NULL;
D
dapan1121 已提交
524
  
dengyihao's avatar
dengyihao 已提交
525 526
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
527

528
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
D
dapan1121 已提交
529 530
  if (NULL == tbCache) {
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
531
    *pDb = NULL;
D
dapan1121 已提交
532 533 534
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
535

536 537
  *pTb = tbCache;

dengyihao's avatar
dengyihao 已提交
538 539 540
  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
541 542
    taosMemoryFreeClear(*pTableMeta);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
543 544
  }

D
dapan1121 已提交
545
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
546
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
547
  if (NULL == *pTableMeta) {
548
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
549 550
  }

D
dapan1121 已提交
551
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
552

553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575
  return TSDB_CODE_SUCCESS;
}


int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
576
  CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
577

D
dapan1121 已提交
578
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
579

D
dapan1121 已提交
580
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
581

D
dapan1121 已提交
582 583 584 585
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
586
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
587
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
588
  *pTableMeta = NULL;
dengyihao's avatar
dengyihao 已提交
589

D
dapan1121 已提交
590 591 592
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
593 594
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
595
  *sver = -1;
D
dapan1121 已提交
596
  *tver = -1;
D
dapan1121 已提交
597 598

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
599
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
600
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
601 602
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
603 604 605
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
606 607 608
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
609
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
610 611
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
612

D
dapan1121 已提交
613
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
614 615 616
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
617 618
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
619

D
dapan1121 已提交
620
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
621 622 623
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
624
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
625

626 627 628 629 630 631
  //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }
  
D
dapan1121 已提交
632
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
633

634
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
D
dapan1121 已提交
635
  if (NULL == tbCache) {
D
dapan1121 已提交
636
    //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
637
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
638 639
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
640 641

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
642
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
643
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
644
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
645 646 647
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
648
  size_t nameLen = 0;
D
dapan1121 已提交
649
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
650 651 652 653

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
654 655
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
656

D
dapan1121 已提交
657
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
658

dengyihao's avatar
dengyihao 已提交
659 660
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
661 662 663 664

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
665
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
666
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
667
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
668
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
669 670
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
671 672
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
673 674 675 676

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
677 678
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
679 680 681
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
682 683
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
684
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
685
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
686 687
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
688

D
dapan1121 已提交
689
  *pRes = NULL;
D
dapan1121 已提交
690

D
dapan1121 已提交
691 692 693
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
694 695 696
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
697
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
698

D
dapan1121 已提交
699
_return:
D
dapan1121 已提交
700

D
dapan1121 已提交
701
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
702

D
dapan1121 已提交
703
  CTG_RET(code);
D
dapan1121 已提交
704 705
}

dengyihao's avatar
dengyihao 已提交
706
int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
707 708 709 710 711 712
  char *p = strchr(dbFName, '.');
  if (p) {
    ++p;
  } else {
    p = dbFName;
  }
dengyihao's avatar
dengyihao 已提交
713

714 715 716 717 718 719 720
  if (IS_SYS_DBNAME(p)) {
    *inCache = true;
    *pass = true;
    ctgDebug("sysdb %s, pass", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
721 722 723 724 725 726 727 728 729
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
  if (NULL == pUser) {
    ctgDebug("user not in cache, user:%s", user);
    goto _return;
  }

  *inCache = true;

  ctgDebug("Got user from cache, user:%s", user);
D
dapan1121 已提交
730
  CTG_CACHE_HIT_INC(CTG_CI_USER, 1);
dengyihao's avatar
dengyihao 已提交
731

D
dapan1121 已提交
732 733 734 735 736 737 738 739 740 741 742
  if (pUser->superUser) {
    *pass = true;
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_READ, &pUser->lock);
  if (pUser->createdDbs && taosHashGet(pUser->createdDbs, dbFName, strlen(dbFName))) {
    *pass = true;
    CTG_UNLOCK(CTG_READ, &pUser->lock);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
743

D
dapan1121 已提交
744
  if (pUser->readDbs && taosHashGet(pUser->readDbs, dbFName, strlen(dbFName)) && CTG_AUTH_READ(type)) {
D
dapan1121 已提交
745 746
    *pass = true;
  }
dengyihao's avatar
dengyihao 已提交
747

D
dapan1121 已提交
748
  if (pUser->writeDbs && taosHashGet(pUser->writeDbs, dbFName, strlen(dbFName)) && CTG_AUTH_WRITE(type)) {
D
dapan1121 已提交
749 750 751 752
    *pass = true;
  }

  CTG_UNLOCK(CTG_READ, &pUser->lock);
dengyihao's avatar
dengyihao 已提交
753

D
dapan1121 已提交
754 755 756 757 758
  return TSDB_CODE_SUCCESS;

_return:

  *inCache = false;
D
dapan1121 已提交
759
  CTG_CACHE_NHIT_INC(CTG_CI_USER, 1);
dengyihao's avatar
dengyihao 已提交
760

D
dapan1121 已提交
761 762 763
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
764
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
765
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
766

D
dapan1121 已提交
767 768 769
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
770
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
771

D
dapan1121 已提交
772 773
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
774
  *op = node->op;
D
dapan1121 已提交
775 776
}

dengyihao's avatar
dengyihao 已提交
777
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
778 779 780
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
781 782
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
783
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
784 785
  }

dengyihao's avatar
dengyihao 已提交
786 787
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
788 789 790
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
791

D
dapan1121 已提交
792
  node->op = operation;
D
dapan1121 已提交
793 794

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
795

D
dapan1121 已提交
796
  if (gCtgMgmt.queue.stopQueue) {
797 798 799 800
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
801

D
dapan1121 已提交
802 803
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
804 805 806

  gCtgMgmt.queue.stopQueue = operation->stopQueue;

D
dapan1121 已提交
807 808
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
809 810
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
811
  CTG_QUEUE_INC();
D
dapan1121 已提交
812
  CTG_STAT_RT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
813 814 815

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
816
  if (syncOp) {
817 818 819
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
820
    tsem_wait(&operation->rspSem);
821 822 823
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
824
    taosMemoryFree(operation);
D
dapan1121 已提交
825 826 827 828 829
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
830 831
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
832 833
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
834
  op->syncOp = true;
dengyihao's avatar
dengyihao 已提交
835

D
dapan1121 已提交
836
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
837
  if (NULL == msg) {
D
dapan1121 已提交
838
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
839
    taosMemoryFree(op);
D
dapan1121 已提交
840
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
841 842 843
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
844
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
845 846 847 848
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
849
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
850 851
  msg->dbId = dbId;

D
dapan1121 已提交
852
  op->data = msg;
D
dapan1121 已提交
853

D
dapan1121 已提交
854
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
855 856 857 858 859 860 861 862

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
863 864
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
865 866 867
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
868

D
dapan1121 已提交
869 870 871
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
872
    taosMemoryFree(op);
D
dapan1121 已提交
873
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
874 875 876
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
877
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
878 879 880 881
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
882
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
883

D
dapan1121 已提交
884
  op->data = msg;
D
dapan1121 已提交
885

D
dapan1121 已提交
886
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
887 888 889 890 891 892 893 894

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
895 896 897
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
898 899 900
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
901

D
dapan1121 已提交
902
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
903
  if (NULL == msg) {
D
dapan1121 已提交
904
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
905
    taosMemoryFree(op);
D
dapan1121 已提交
906
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
907 908 909
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
910 911
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
912 913 914
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
915
  op->data = msg;
D
dapan1121 已提交
916

D
dapan1121 已提交
917
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
918 919 920 921 922 923 924 925

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
926 927
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
928 929 930
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
931

D
dapan1121 已提交
932
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
933
  if (NULL == msg) {
D
dapan1121 已提交
934
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
935
    taosMemoryFree(op);
D
dapan1121 已提交
936
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
937 938 939
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
940 941
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
942 943
  msg->dbId = dbId;

D
dapan1121 已提交
944
  op->data = msg;
D
dapan1121 已提交
945

D
dapan1121 已提交
946
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
947 948 949 950 951 952 953 954

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
955 956
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
957 958 959
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
960

D
dapan1121 已提交
961 962 963
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
964
    taosMemoryFree(op);
965
    freeVgInfo(dbInfo);
D
dapan1121 已提交
966
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
967 968 969
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
970
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
971 972 973
    dbFName = p + 1;
  }

974 975 976 977 978 979
  code = ctgMakeVgArray(dbInfo);
  if (code) {
    taosMemoryFree(op);
    taosMemoryFree(msg);
    freeVgInfo(dbInfo);
    CTG_ERR_RET(code);
980 981
  }

D
dapan1121 已提交
982
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
983 984 985 986
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
987
  op->data = msg;
D
dapan1121 已提交
988

D
dapan1121 已提交
989
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
990 991 992 993 994

  return TSDB_CODE_SUCCESS;

_return:

995
  freeVgInfo(dbInfo);
D
dapan1121 已提交
996 997 998
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
999 1000
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1001 1002 1003
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1004

D
dapan1121 已提交
1005
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
1006
  if (NULL == msg) {
D
dapan1121 已提交
1007
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
1008
    taosMemoryFree(op);
D
dapan1121 已提交
1009
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1010 1011 1012
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
1013
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
1014 1015
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
1016 1017 1018
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1019
  msg->pMeta = output;
D
dapan1121 已提交
1020

D
dapan1121 已提交
1021
  op->data = msg;
D
dapan1121 已提交
1022

D
dapan1121 已提交
1023
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1024 1025

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1026

D
dapan1121 已提交
1027 1028
_return:

D
dapan1121 已提交
1029 1030 1031 1032 1033
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
1034 1035 1036
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1037 1038
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
1039 1040
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
1041

D
dapan1121 已提交
1042 1043 1044
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
1045
    taosMemoryFree(op);
D
dapan1121 已提交
1046
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1047 1048 1049
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1050
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
1051 1052 1053
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
1054
  op->data = msg;
D
dapan1121 已提交
1055

D
dapan1121 已提交
1056
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1057 1058

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1059

D
dapan1121 已提交
1060 1061 1062 1063 1064
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1065 1066
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1067 1068 1069
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1070

D
dapan1121 已提交
1071 1072 1073
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
1074
    taosMemoryFree(op);
D
dapan1121 已提交
1075
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1076 1077 1078 1079 1080
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
1081
  op->data = msg;
D
dapan1121 已提交
1082

D
dapan1121 已提交
1083
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1084

D
dapan1121 已提交
1085
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1086

D
dapan1121 已提交
1087 1088 1089
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
1090

D
dapan1121 已提交
1091 1092 1093
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1094 1095
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1096 1097 1098
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1099

D
dapan1121 已提交
1100 1101 1102
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
1103
    taosMemoryFree(op);
D
dapan1121 已提交
1104
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1105 1106 1107
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1108
  msg->pIndex = *pIndex;
D
dapan1121 已提交
1109 1110 1111 1112

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1113 1114

  *pIndex = NULL;
D
dapan1121 已提交
1115
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1116

D
dapan1121 已提交
1117 1118
_return:

D
dapan1121 已提交
1119
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
1120
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
1121

D
dapan1121 已提交
1122 1123 1124
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1125 1126
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1127 1128 1129
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1130

D
dapan1121 已提交
1131 1132 1133
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
1134
    taosMemoryFree(op);
D
dapan1121 已提交
1135
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1145

D
dapan1121 已提交
1146
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1147

D
dapan1121 已提交
1148 1149 1150 1151 1152
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1153 1154
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1155 1156 1157
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
1158
  op->stopQueue = stopQueue;
1159
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
1160

D
dapan1121 已提交
1161 1162 1163
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
1164
    taosMemoryFree(op);
D
dapan1121 已提交
1165
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1166 1167 1168
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1169
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
1170 1171 1172
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1173

D
dapan1121 已提交
1174
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1175

D
dapan1121 已提交
1176 1177 1178 1179 1180
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
1181 1182 1183 1184 1185 1186
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1187

D
dapan1121 已提交
1188 1189 1190
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1191
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1192 1193 1194
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1195

D
dapan1121 已提交
1196 1197 1198 1199 1200 1201 1202
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1203 1204
  int32_t       code = 0;

D
dapan1121 已提交
1205 1206 1207 1208
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1209 1210
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1211
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1212 1213 1214 1215
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1216
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1217
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1218 1219 1220 1221
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1222
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1223 1224 1225 1226 1227 1228 1229

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1230 1231
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1232 1233 1234
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1235
  int32_t       code = 0;
D
dapan1121 已提交
1236 1237 1238

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1239
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1240 1241 1242 1243
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1244 1245
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1246 1247 1248 1249 1250 1251 1252
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1253 1254
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1255 1256 1257 1258 1259
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1260
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1261 1262 1263 1264 1265 1266

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1267 1268
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277 1278
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1279 1280
  int32_t       code = 0;

D
dapan1121 已提交
1281 1282
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1283
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1295
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1296 1297 1298 1299 1300
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1301
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1318 1319
  int32_t       code = 0;

D
dapan1121 已提交
1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1338
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1385 1386
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1387
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1388
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1389
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1390 1391
  }

dengyihao's avatar
dengyihao 已提交
1392 1393
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1394
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1395
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1396
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1397 1398 1399 1400 1401 1402 1403 1404
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1405

D
dapan1121 已提交
1406
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1407
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1408 1409
  }

D
dapan1121 已提交
1410
  CTG_CACHE_NUM_INC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
1411

D
dapan1121 已提交
1412
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
D
dapan1121 已提交
1413
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1414

dengyihao's avatar
dengyihao 已提交
1415
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1416

D
dapan1121 已提交
1417 1418
  if (!IS_SYS_DBNAME(dbFName)) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1419

D
dapan1121 已提交
1420 1421
    ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
  }
D
dapan1121 已提交
1422 1423 1424 1425 1426 1427 1428 1429 1430 1431

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1432
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1433 1434 1435
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1436

D
dapan1121 已提交
1437 1438 1439 1440
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1441

dengyihao's avatar
dengyihao 已提交
1442 1443 1444
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1445
    }
dengyihao's avatar
dengyihao 已提交
1446

D
dapan1121 已提交
1447
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1448 1449 1450
  }
}

dengyihao's avatar
dengyihao 已提交
1451
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1452
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1453 1454

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1455

D
dapan1121 已提交
1456
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1457

D
dapan1121 已提交
1458
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1459
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1460 1461
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1462 1463 1464
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1465
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1466 1467 1468 1469 1470 1471

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1472
  CTG_CACHE_NUM_DEC(CTG_CI_DB, 1);
dengyihao's avatar
dengyihao 已提交
1473 1474
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1475 1476 1477
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1478 1479
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1480 1481
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1482

D
dapan1121 已提交
1483
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1484
    // TODO OPEN IT
D
dapan1121 已提交
1485 1486 1487 1488 1489 1490 1491 1492 1493 1494 1495 1496 1497 1498 1499 1500
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1501

D
dapan1121 已提交
1502 1503 1504 1505 1506 1507 1508
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1509

D
dapan1121 已提交
1510 1511 1512 1513 1514 1515 1516 1517 1518
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1519 1520
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1521 1522 1523 1524 1525 1526 1527 1528 1529
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1530

D
dapan1121 已提交
1531 1532
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1533

dengyihao's avatar
dengyihao 已提交
1534 1535
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1536

dengyihao's avatar
dengyihao 已提交
1537 1538
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1539

dengyihao's avatar
dengyihao 已提交
1540 1541
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1542

dengyihao's avatar
dengyihao 已提交
1543 1544
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1545 1546
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1547
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1548 1549 1550
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1551 1552 1553 1554 1555
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1556 1557 1558
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1559 1560
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1561 1562
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1563 1564
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1565

D
dapan1121 已提交
1566
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1567
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1568
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1569
      } else {
dengyihao's avatar
dengyihao 已提交
1570
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1571 1572 1573 1574
      }
    }
  }

D
dapan1121 已提交
1575 1576 1577 1578 1579
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
1580
      taosMemoryFree(meta);
D
dapan1121 已提交
1581
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1582
    }
dengyihao's avatar
dengyihao 已提交
1583

D
dapan1121 已提交
1584 1585
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1586
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1587 1588 1589
    if (orig) {
      CTG_META_NUM_DEC(origType);
    }
D
dapan1121 已提交
1590 1591
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1592
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1593 1594
  }

D
dapan1121 已提交
1595
  CTG_META_NUM_INC(pCache->pMeta->tableType);
D
dapan1121 已提交
1596 1597 1598 1599 1600 1601 1602 1603

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1604
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1605
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1606
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1607 1608
  }

dengyihao's avatar
dengyihao 已提交
1609 1610
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1611

D
dapan1121 已提交
1612 1613 1614
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1615

D
dapan1121 已提交
1616 1617 1618
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1619
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1620
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1621
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1622
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1623
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1624 1625 1626
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1627 1628 1629
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1630 1631 1632
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1633

D
dapan1121 已提交
1634 1635
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1636 1637
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1638
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1639 1640
    }

D
dapan1121 已提交
1641 1642
    CTG_DB_NUM_INC(CTG_CI_TBL_SMA);
    
D
dapan1121 已提交
1643
    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1644 1645
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1646

D
dapan1121 已提交
1647
    if (suid) {
D
dapan1121 已提交
1648
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1649
    }
dengyihao's avatar
dengyihao 已提交
1650

D
dapan1121 已提交
1651 1652 1653
    return TSDB_CODE_SUCCESS;
  }

1654 1655
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1656
  if (pCache->pIndex) {
D
dapan1121 已提交
1657 1658 1659
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1660 1661 1662 1663 1664
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1665 1666
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1667 1668
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1669 1670
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1671

D
dapan1121 已提交
1672 1673 1674
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1675

D
dapan1121 已提交
1676 1677 1678
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1679 1680 1681 1682
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1683
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1684 1685 1686
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1687 1688

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1689

D
dapan1121 已提交
1690 1691 1692 1693 1694 1695
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1696
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1697
  SCatalog *pCtg = NULL;
1698

dengyihao's avatar
dengyihao 已提交
1699
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1700
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1701
    pCtg = *(SCatalog **)pIter;
1702 1703

    if (pCtg) {
D
dapan1121 已提交
1704 1705 1706 1707 1708 1709 1710 1711
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1712
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1713

dengyihao's avatar
dengyihao 已提交
1714
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1715
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1716
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1717 1718 1719

    if (pCtg) {
      ctgFreeHandle(pCtg);
1720 1721 1722 1723 1724 1725 1726 1727
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741
int32_t ctgVgInfoIdComp(void const* lp, void const* rp) {
  int32_t*    key = (int32_t*)lp;
  SVgroupInfo* pVg = (SVgroupInfo*)rp;

  if (*key < pVg->vgId) {
    return -1;
  } else if (*key > pVg->vgId) {
    return 1;
  }

  return 0;
}


D
dapan1121 已提交
1742
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1743
  int32_t          code = 0;
D
dapan1121 已提交
1744
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1745 1746 1747 1748
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1749
  if (pCtg->stopUpdate || NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1750
    goto _return;
D
dapan1121 已提交
1751
  }
dengyihao's avatar
dengyihao 已提交
1752

D
dapan1121 已提交
1753
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1754
    ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dengyihao's avatar
dengyihao 已提交
1755
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1756
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1757 1758
  }

dengyihao's avatar
dengyihao 已提交
1759
  bool         newAdded = false;
dengyihao's avatar
dengyihao 已提交
1760 1761
  SDbVgVersion vgVersion = {
      .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
D
dapan1121 已提交
1762 1763

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1764
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1765
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1766
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1767
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1768 1769 1770
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1771
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1772

D
dapan1121 已提交
1773 1774
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1775

D
dapan1121 已提交
1776
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
dengyihao's avatar
dengyihao 已提交
1777 1778
      ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion,
               vgInfo->vgVersion);
D
dapan1121 已提交
1779
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1780

D
dapan1121 已提交
1781
      goto _return;
D
dapan1121 已提交
1782 1783
    }

dengyihao's avatar
dengyihao 已提交
1784 1785 1786 1787
    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable &&
        dbInfo->stateTs == vgInfo->stateTs) {
      ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName,
               dbInfo->vgVersion, dbInfo->numOfTable, dbInfo->stateTs);
D
dapan1121 已提交
1788
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1789

D
dapan1121 已提交
1790
      goto _return;
D
dapan1121 已提交
1791 1792
    }

1793
    freeVgInfo(vgInfo);
D
dapan1121 已提交
1794
    CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1795 1796 1797 1798
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;
D
dapan1121 已提交
1799
  CTG_DB_NUM_SET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1800

dengyihao's avatar
dengyihao 已提交
1801 1802
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
           vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
D
dapan1121 已提交
1803 1804 1805 1806 1807

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

1808
  //if (!IS_SYS_DBNAME(dbFName)) {
D
dapan1121 已提交
1809 1810 1811
    tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
    CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                   ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
1812
  //}
D
dapan1121 已提交
1813 1814 1815

_return:

1816
  freeVgInfo(msg->dbInfo);
D
dapan1121 已提交
1817
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1818

D
dapan1121 已提交
1819 1820 1821
  CTG_RET(code);
}

D
dapan1121 已提交
1822
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1823
  int32_t        code = 0;
D
dapan1121 已提交
1824
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1825
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1826

D
dapan1121 已提交
1827 1828 1829 1830
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1831 1832 1833 1834 1835
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1836

1837
  if (msg->dbId && dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1838 1839
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1840 1841
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1842

D
dapan1121 已提交
1843 1844 1845 1846 1847
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1848

D
dapan1121 已提交
1849 1850 1851
  CTG_RET(code);
}

D
dapan1121 已提交
1852
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1853
  int32_t              code = 0;
D
dapan1121 已提交
1854
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1855
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1856

D
dapan1121 已提交
1857 1858 1859 1860
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1861 1862 1863 1864 1865
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1866

dengyihao's avatar
dengyihao 已提交
1867
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1868

1869
  freeVgInfo(dbCache->vgCache.vgInfo);
D
dapan1121 已提交
1870
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1871

D
dapan1121 已提交
1872
  CTG_DB_NUM_RESET(CTG_CI_DB_VGROUP);
D
dapan1121 已提交
1873 1874
  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1875
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1876 1877 1878 1879

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1880

D
dapan1121 已提交
1881 1882 1883
  CTG_RET(code);
}

D
dapan1121 已提交
1884
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1885
  int32_t              code = 0;
D
dapan1121 已提交
1886
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1887 1888 1889
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1890

D
dapan1121 已提交
1891 1892 1893
  if (pCtg->stopUpdate) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1894

D
dapan1121 已提交
1895 1896
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1897 1898 1899
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1900 1901
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1902
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1903 1904
  }

D
dapan1121 已提交
1905
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1906
  if (NULL == dbCache) {
D
dapan1121 已提交
1907
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1908 1909 1910
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1911 1912
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1913
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1914
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1915
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1916 1917
  }

D
dapan1121 已提交
1918
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1919
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1920 1921 1922 1923
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1924 1925
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1926 1927 1928 1929
  }

_return:

D
dapan1121 已提交
1930 1931
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1932

D
dapan1121 已提交
1933
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1934

D
dapan1121 已提交
1935 1936 1937
  CTG_RET(code);
}

D
dapan1121 已提交
1938
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1939
  int32_t             code = 0;
D
dapan1121 已提交
1940
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1941
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1942
  int32_t             tblType = 0;
D
dapan1121 已提交
1943

D
dapan1121 已提交
1944 1945 1946 1947
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1948 1949 1950
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1951
    goto _return;
D
dapan1121 已提交
1952 1953 1954
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1955
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1956
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1957
    goto _return;
D
dapan1121 已提交
1958
  }
dengyihao's avatar
dengyihao 已提交
1959

D
dapan1121 已提交
1960
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1961 1962
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1963 1964
  }

dengyihao's avatar
dengyihao 已提交
1965
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1966 1967 1968 1969 1970 1971
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
D
dapan1121 已提交
1972
  tblType = pTbCache->pMeta->tableType;
D
dapan1121 已提交
1973 1974 1975
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1976 1977
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1978
  } else {
D
dapan1121 已提交
1979
    CTG_META_NUM_DEC(tblType);
D
dapan1121 已提交
1980
  }
dengyihao's avatar
dengyihao 已提交
1981 1982

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1983 1984

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1985 1986 1987

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1988 1989 1990
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1991

D
dapan1121 已提交
1992 1993 1994
  CTG_RET(code);
}

D
dapan1121 已提交
1995
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1996
  int32_t             code = 0;
D
dapan1121 已提交
1997
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1998
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1999
  int32_t             tblType = 0;
D
dapan1121 已提交
2000

D
dapan1121 已提交
2001 2002 2003 2004
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2005 2006 2007
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
2008
    goto _return;
D
dapan1121 已提交
2009 2010 2011
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
2012 2013
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
2014
    goto _return;
D
dapan1121 已提交
2015 2016
  }

dengyihao's avatar
dengyihao 已提交
2017
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
2018 2019 2020 2021 2022 2023
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
D
dapan1121 已提交
2024
  tblType = pTbCache->pMeta->tableType;
D
dapan1121 已提交
2025 2026
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2027

D
dapan1121 已提交
2028 2029 2030
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
2031
  } else {
D
dapan1121 已提交
2032
    CTG_META_NUM_DEC(tblType);
D
dapan1121 已提交
2033 2034
  }

D
dapan1121 已提交
2035
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
2036 2037 2038 2039 2040 2041 2042 2043

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
2044
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2045
  int32_t            code = 0;
D
dapan1121 已提交
2046
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2047 2048
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
2049 2050 2051 2052
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2053 2054 2055 2056 2057 2058 2059 2060 2061 2062 2063 2064 2065 2066 2067 2068 2069
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

    userAuth.version = msg->userAuth.version;
    userAuth.superUser = msg->userAuth.superAuth;
    userAuth.createdDbs = msg->userAuth.createdDbs;
    userAuth.readDbs = msg->userAuth.readDbs;
    userAuth.writeDbs = msg->userAuth.writeDbs;

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

D
dapan1121 已提交
2070 2071
    CTG_CACHE_NUM_INC(CTG_CI_USER, 1);

D
dapan1121 已提交
2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097
    return TSDB_CODE_SUCCESS;
  }

  pUser->version = msg->userAuth.version;

  CTG_LOCK(CTG_WRITE, &pUser->lock);

  taosHashCleanup(pUser->createdDbs);
  pUser->createdDbs = msg->userAuth.createdDbs;
  msg->userAuth.createdDbs = NULL;

  taosHashCleanup(pUser->readDbs);
  pUser->readDbs = msg->userAuth.readDbs;
  msg->userAuth.readDbs = NULL;

  taosHashCleanup(pUser->writeDbs);
  pUser->writeDbs = msg->userAuth.writeDbs;
  msg->userAuth.writeDbs = NULL;

  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
dengyihao's avatar
dengyihao 已提交
2098

D
dapan1121 已提交
2099
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2100

D
dapan1121 已提交
2101 2102 2103
  CTG_RET(code);
}

D
dapan1121 已提交
2104
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2105
  int32_t             code = 0;
D
dapan1121 已提交
2106
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2107
  SCatalog           *pCtg = msg->pCtg;
dengyihao's avatar
dengyihao 已提交
2108
  SCtgDBCache        *dbCache = NULL;
D
dapan1121 已提交
2109 2110 2111 2112 2113

  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2114
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2115 2116 2117 2118 2119
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
2120 2121
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
2122
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
2123
  if (NULL == vgInfo) {
D
dapan1121 已提交
2124 2125 2126
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
2127 2128

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
2129
  if (NULL == pInfo) {
2130 2131 2132 2133 2134 2135 2136
    ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

  SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
  if (NULL == pInfo2) {
    ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
D
dapan1121 已提交
2137 2138 2139
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
2140 2141 2142 2143 2144
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
2145

D
dapan1121 已提交
2146
  pInfo->epSet = msg->epSet;
2147
  pInfo2->epSet = msg->epSet;
D
dapan1121 已提交
2148 2149 2150

_return:

D
dapan1121 已提交
2151
  if (code == TSDB_CODE_SUCCESS && dbCache) {
D
dapan1121 已提交
2152
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
2153 2154 2155
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2156

D
dapan1121 已提交
2157 2158 2159
  CTG_RET(code);
}

D
dapan1121 已提交
2160
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2161
  int32_t               code = 0;
D
dapan1121 已提交
2162
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2163 2164 2165 2166
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
2167 2168 2169 2170
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2171
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
2172 2173 2174 2175 2176 2177 2178 2179 2180

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2181

D
dapan1121 已提交
2182
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2183

D
dapan1121 已提交
2184 2185 2186 2187
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2188
  int32_t             code = 0;
D
dapan1121 已提交
2189
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2190 2191 2192
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
2193 2194 2195 2196
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2197
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2198
  if (NULL == dbCache) {
D
dapan1121 已提交
2199 2200 2201
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
2202
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
2203 2204
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
2205
  }
D
dapan1121 已提交
2206 2207 2208
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
2209

D
dapan1121 已提交
2210
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
2211 2212 2213 2214 2215 2216 2217

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2218

D
dapan1121 已提交
2219
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2220

D
dapan1121 已提交
2221 2222 2223
  CTG_RET(code);
}

D
dapan1121 已提交
2224
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2225
  int32_t            code = 0;
D
dapan1121 已提交
2226
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2227
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
2228

D
dapan1121 已提交
2229 2230
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
2231
  if (pCtg) {
D
dapan1121 已提交
2232 2233 2234 2235 2236
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
2237

D
dapan1121 已提交
2238 2239
    goto _return;
  }
D
dapan1121 已提交
2240 2241 2242 2243 2244 2245

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
2246 2247

_return:
D
dapan1121 已提交
2248 2249

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
2250

D
dapan1121 已提交
2251
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2252

D
dapan1121 已提交
2253 2254 2255
  CTG_RET(code);
}

2256 2257 2258 2259 2260 2261 2262 2263
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
  if (NULL == op || NULL == op->data) {
    return;
  }

  switch (op->opId) {
    case CTG_OP_UPDATE_VGROUP: {
      SCtgUpdateVgMsg *msg = op->data;
2264
      freeVgInfo(msg->dbInfo);
2265 2266 2267 2268 2269 2270 2271 2272 2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_META: {
      SCtgUpdateTbMetaMsg *msg = op->data;
      taosMemoryFreeClear(msg->pMeta->tbMeta);
      taosMemoryFreeClear(msg->pMeta);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_DROP_DB_CACHE:
    case CTG_OP_DROP_DB_VGROUP:
    case CTG_OP_DROP_STB_META:
    case CTG_OP_DROP_TB_META:
    case CTG_OP_UPDATE_VG_EPSET:
    case CTG_OP_DROP_TB_INDEX:
    case CTG_OP_CLEAR_CACHE: {
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_USER: {
      SCtgUpdateUserMsg *msg = op->data;
      taosHashCleanup(msg->userAuth.createdDbs);
      taosHashCleanup(msg->userAuth.readDbs);
      taosHashCleanup(msg->userAuth.writeDbs);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_INDEX: {
      SCtgUpdateTbIndexMsg *msg = op->data;
      if (msg->pIndex) {
        taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
        taosMemoryFreeClear(msg->pIndex);
      }
      taosMemoryFreeClear(op->data);
      break;
    }
    default: {
      qError("invalid cache op id:%d", op->opId);
      break;
    }
  }
}

D
dapan1121 已提交
2309
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2310 2311
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2312
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2313
  bool                stopQueue = false;
D
dapan1121 已提交
2314 2315 2316 2317 2318

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2319 2320 2321 2322 2323 2324
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
D
dapan1121 已提交
2325
          CTG_STAT_RT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2326
        } else {
2327
          ctgFreeCacheOperationData(op);
D
dapan1121 已提交
2328
          CTG_STAT_RT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2329
        }
dengyihao's avatar
dengyihao 已提交
2330

D
dapan1121 已提交
2331 2332
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2333
        } else {
D
dapan1121 已提交
2334
          taosMemoryFree(op);
D
dapan1121 已提交
2335
        }
D
dapan1121 已提交
2336
      }
D
dapan1121 已提交
2337 2338 2339

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2340

D
dapan1121 已提交
2341
      node = nodeNext;
D
dapan1121 已提交
2342 2343
    }

D
dapan1121 已提交
2344
    if (!stopQueue) {
D
dapan1121 已提交
2345 2346 2347 2348
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2349 2350 2351 2352 2353 2354
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2355
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2356
  setThreadName("catalog");
2357

D
dapan1121 已提交
2358 2359 2360 2361 2362 2363
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2364

2365
    if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
D
dapan1121 已提交
2366
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2367 2368 2369
      break;
    }

D
dapan1121 已提交
2370 2371 2372
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2373

D
dapan1121 已提交
2374
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2375

D
dapan1121 已提交
2376
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2377

D
dapan1121 已提交
2378
    if (operation->syncOp) {
D
dapan1121 已提交
2379
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2380 2381
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2382 2383
    }

D
dapan1121 已提交
2384
    CTG_STAT_RT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2385

D
dapan1121 已提交
2386
    ctgdShowCacheInfo();
D
dapan1121 已提交
2387
    ctgdShowStatInfo();
D
dapan1121 已提交
2388 2389 2390
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2391

D
dapan1121 已提交
2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2404

D
dapan1121 已提交
2405 2406 2407 2408
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2409
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2410
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
2423
    *pTableMeta = NULL;
D
dapan1121 已提交
2424 2425 2426 2427 2428 2429 2430 2431 2432
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2433
#if 0
2434
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2435 2436 2437 2438 2439 2440 2441 2442 2443 2444 2445 2446 2447 2448 2449 2450 2451 2452 2453 2454 2455 2456 2457 2458 2459 2460 2461 2462 2463 2464 2465 2466 2467 2468 2469 2470 2471
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2472
      fetch.tbIdx = i;
D
dapan1121 已提交
2473 2474 2475 2476 2477 2478
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2479
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2480 2481 2482
  }

  if (NULL == ctx->pFetchs) {
2483
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2484 2485 2486 2487
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2488 2489
#endif

dengyihao's avatar
dengyihao 已提交
2490 2491 2492 2493 2494 2495 2496 2497 2498
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2499 2500 2501 2502 2503 2504 2505 2506
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2507
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2508
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2509

D
dapan1121 已提交
2510 2511 2512
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2513
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2514
      taosArrayPush(ctx->pResList, &(SMetaData){0});
D
dapan1121 已提交
2515 2516 2517 2518 2519 2520
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2521
    pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2522 2523 2524 2525

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2526
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2527
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2528 2529
      CTG_META_NHIT_INC();
      
D
dapan1121 已提交
2530 2531 2532 2533 2534
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
K
kailixu 已提交
2535
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
D
dapan1121 已提交
2536
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2537
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2538
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2539
      CTG_META_NHIT_INC();
dengyihao's avatar
dengyihao 已提交
2540

D
dapan1121 已提交
2541 2542 2543
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2544
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2545

D
dapan1121 已提交
2546 2547
    CTG_META_HIT_INC(tbMeta->tableType);
    
D
dapan1121 已提交
2548 2549 2550 2551 2552 2553 2554
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2555 2556
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2557 2558 2559 2560
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2561
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2562 2563
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2564

D
dapan1121 已提交
2565
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2566

2567
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2568 2569
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2570
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2571

D
dapan1121 已提交
2572
      res.pRes = pTableMeta;
2573
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2574 2575 2576

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2577

D
dapan1121 已提交
2578 2579 2580 2581
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2582
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2583

2584
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2585 2586
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2587
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2588

D
dapan1121 已提交
2589
      res.pRes = pTableMeta;
2590
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2591

D
dapan1121 已提交
2592 2593
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2594

D
dapan1121 已提交
2595 2596 2597
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2598
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2599 2600
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2601

D
dapan1121 已提交
2602
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2603

2604
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2605 2606 2607 2608 2609 2610
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2611 2612
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2613
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2614
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2615
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2616 2617

      CTG_META_NHIT_INC();
D
dapan1121 已提交
2618 2619 2620 2621 2622 2623 2624
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2625

2626
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2627
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2628
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2629 2630
      
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2631 2632 2633 2634 2635 2636 2637 2638
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2639
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2640 2641
      taosHashRelease(dbCache->tbCache, pCache);

2642
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2643
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2644 2645
      taosMemoryFreeClear(pTableMeta);

D
dapan1121 已提交
2646
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2647 2648
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2649 2650 2651

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2652
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2653 2654 2655 2656 2657
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2658
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2659
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2660 2661
      taosMemoryFreeClear(pTableMeta);

D
dapan1121 已提交
2662
      CTG_META_NHIT_INC();
D
dapan1121 已提交
2663 2664
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2665

D
dapan1121 已提交
2666 2667
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2668
    if (NULL == pTableMeta) {
2669
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2670 2671
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2672

D
dapan1121 已提交
2673
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2674

2675
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2676 2677
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2678 2679
    CTG_META_HIT_INC(pTableMeta->tableType);

D
dapan1121 已提交
2680
    res.pRes = pTableMeta;
2681
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2682 2683 2684 2685 2686

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2687
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2688

D
dapan1121 已提交
2689 2690 2691
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2692
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2693
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2694
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710 2711 2712 2713 2714 2715 2716 2717 2718 2719 2720 2721 2722
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2723
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2724 2725 2726 2727
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2728
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2729 2730 2731 2732 2733 2734 2735 2736 2737 2738 2739 2740 2741 2742 2743 2744 2745 2746 2747 2748 2749 2750 2751 2752 2753 2754
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}