ctgCache.c 74.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34 35
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
36

D
dapan1121 已提交
37
  if (dbCache->deleted) {
D
dapan1121 已提交
38
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
39

dengyihao's avatar
dengyihao 已提交
40 41
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
42 43 44 45
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
46 47
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
48 49

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
50
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
51 52 53 54
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57 58
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
59 60
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
61 62

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
63
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
64
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
65 66 67 68 69 70
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
71
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
72

dengyihao's avatar
dengyihao 已提交
73
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
74

dengyihao's avatar
dengyihao 已提交
75 76
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
D
dapan1121 已提交
77 78
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
79

dengyihao's avatar
dengyihao 已提交
80
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
81
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
82
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
83 84 85
    dbFName = p + 1;
  }

D
dapan1121 已提交
86 87 88 89 90 91 92
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
dengyihao's avatar
dengyihao 已提交
93

D
dapan1121 已提交
94 95 96 97 98 99
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
100 101 102 103
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
104 105 106
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
107 108
    }

D
dapan1121 已提交
109 110 111 112 113 114
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
dengyihao's avatar
dengyihao 已提交
115

D
dapan1121 已提交
116 117 118
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
119
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
120 121 122
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
123
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
124 125 126
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
127
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
128 129 130
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
131

dengyihao's avatar
dengyihao 已提交
132
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
133
  if (pCache && dbCache) {
D
dapan1121 已提交
134
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
135
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
136
  }
D
dapan1121 已提交
137

D
dapan1121 已提交
138 139
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
140
  }
D
dapan1121 已提交
141
}
D
dapan1121 已提交
142

dengyihao's avatar
dengyihao 已提交
143
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
144 145
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
146
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
147 148 149 150 151 152 153
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

154
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
155
  if (pCache && dbCache) {
156 157 158 159
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
    taosHashRelease(dbCache->tbCache, pCache);
  }

D
dapan1121 已提交
160 161 162 163
  if (dbCache) {
    ctgRUnlockVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }
164 165
}

dengyihao's avatar
dengyihao 已提交
166
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
167
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
168
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
169
  if (NULL == dbCache) {
D
dapan1121 已提交
170 171 172 173 174
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
175
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
176 177 178 179 180 181 182
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
183
  CTG_CACHE_STAT_INC(numOfVgHit, 1);
D
dapan1121 已提交
184 185

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
186

D
dapan1121 已提交
187 188 189 190 191 192 193 194 195 196
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
197
  CTG_CACHE_STAT_INC(numOfVgMiss, 1);
dengyihao's avatar
dengyihao 已提交
198

D
dapan1121 已提交
199 200 201
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
202
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
203
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
204
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
205 206 207 208 209
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
210

D
dapan1121 已提交
211
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
212 213 214
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
215 216
  }

D
dapan1121 已提交
217 218 219 220 221 222 223 224 225 226
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
229 230 231 232 233 234 235

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
236
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
dengyihao's avatar
dengyihao 已提交
237

D
dapan1121 已提交
238 239 240
  return TSDB_CODE_SUCCESS;
}

241 242 243
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
244
  bool vgInCache = false;
245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309

  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    CTG_CACHE_STAT_INC(numOfVgMiss, 1);
    goto _return;
  }

  ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
  if (!vgInCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    CTG_CACHE_STAT_INC(numOfVgMiss, 1);
    goto _return;
  }

  *pDb = dbCache;

  CTG_CACHE_STAT_INC(numOfVgHit, 1);

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);

  tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
  if (NULL == tbCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (NULL == tbCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
    goto _return;
  }

  *pTb = tbCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);

  CTG_CACHE_STAT_INC(numOfMetaHit, 1);

  return TSDB_CODE_SUCCESS;

_return:

  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }

  if (vgInCache) {
    ctgRUnlockVgInfo(dbCache);
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pDb = NULL;
  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}


310
/*
dengyihao's avatar
dengyihao 已提交
311 312 313
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
314 315 316 317 318
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
319 320

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
321
  if (NULL == stName) {
D
dapan1121 已提交
322
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
323 324 325 326 327
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
328
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
329 330 331 332
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

D
dapan1121 已提交
333 334
  taosHashRelease(dbCache->stbCache, stName);

D
dapan1121 已提交
335 336
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
337
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
338 339 340 341 342 343
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
344
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
345

D
dapan1121 已提交
346
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
347 348 349 350 351 352 353

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
354
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
D
dapan1121 已提交
355 356 357

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
358

D
dapan1121 已提交
359 360
  return TSDB_CODE_SUCCESS;
}
361
*/
D
dapan1121 已提交
362

363
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
D
dapan1121 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404
  SCtgTbCache *pCache = NULL;
  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
  if (NULL == stName) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  taosHashRelease(dbCache->stbCache, stName);

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  *pTb = pCache;

  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);

  CTG_CACHE_STAT_INC(numOfMetaHit, 1);

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);

  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
405
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
406
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
407
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
408 409
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
410 411 412
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
413

D
dapan1121 已提交
414
  int32_t sz = 0;
D
dapan1121 已提交
415
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
416 417 418 419 420 421 422 423 424
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
425 426
  }

D
dapan1121 已提交
427 428 429 430
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
431

D
dapan1121 已提交
432
  CTG_CACHE_STAT_INC(numOfIndexHit, 1);
D
dapan1121 已提交
433 434 435 436 437 438 439

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
440
  CTG_CACHE_STAT_INC(numOfIndexMiss, 1);
dengyihao's avatar
dengyihao 已提交
441

D
dapan1121 已提交
442 443 444
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
445
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
446
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
447
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
448 449 450
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
451

D
dapan1121 已提交
452 453 454 455 456
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
457
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
458

D
dapan1121 已提交
459 460 461
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
462 463
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta, char* dbFName) {
  SCtgDBCache *dbCache = *pDb;
464
  SCtgTbCache *tbCache = *pTb;
dengyihao's avatar
dengyihao 已提交
465
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
466 467 468 469
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
470

D
dapan1121 已提交
471
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
472 473 474
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
475
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
476 477 478
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
479

D
dapan1121 已提交
480
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
481 482
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
483 484

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
485

D
dapan1121 已提交
486 487 488
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
489
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
490 491
  }

D
dapan1121 已提交
492
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
493

D
dapan1121 已提交
494 495
  //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

496 497 498
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
  taosHashRelease(dbCache->tbCache, tbCache);
  *pTb = NULL;
D
dapan1121 已提交
499
  
dengyihao's avatar
dengyihao 已提交
500 501
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
502

503
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
D
dapan1121 已提交
504 505
  if (NULL == tbCache) {
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
506
    *pDb = NULL;
D
dapan1121 已提交
507 508 509
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
510

511 512
  *pTb = tbCache;

dengyihao's avatar
dengyihao 已提交
513 514 515
  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
516 517
    taosMemoryFreeClear(*pTableMeta);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
518 519
  }

D
dapan1121 已提交
520
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
521
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
522
  if (NULL == *pTableMeta) {
523
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
524 525
  }

D
dapan1121 已提交
526
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
527

528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
  return TSDB_CODE_SUCCESS;
}


int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
551
  CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
552

D
dapan1121 已提交
553
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
554

D
dapan1121 已提交
555
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
556

D
dapan1121 已提交
557 558 559 560
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
561
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
562
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
563
  *pTableMeta = NULL;
dengyihao's avatar
dengyihao 已提交
564

D
dapan1121 已提交
565 566 567
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
568 569
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
570
  *sver = -1;
D
dapan1121 已提交
571
  *tver = -1;
D
dapan1121 已提交
572 573

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
574
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
575
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
576 577
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
578 579 580
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
581 582 583
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
584
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
585 586
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
587

D
dapan1121 已提交
588
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
589 590 591
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
592 593
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
594

D
dapan1121 已提交
595
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
596 597 598
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
599
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
600

601 602 603 604 605 606
  //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }
  
D
dapan1121 已提交
607
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
608

609
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
D
dapan1121 已提交
610
  if (NULL == tbCache) {
D
dapan1121 已提交
611
    //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
612
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
613 614
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
615 616

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
617
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
618
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
619
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
620 621 622
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
623
  size_t nameLen = 0;
D
dapan1121 已提交
624
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
625 626 627 628

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
629 630
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
631

D
dapan1121 已提交
632
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
633

dengyihao's avatar
dengyihao 已提交
634 635
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
636 637 638 639

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
640
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
641
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
642
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
643
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
644 645
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
646 647
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
648 649 650 651

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
652 653
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
654 655 656
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
657 658
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
659
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
660
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
661 662
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
663

D
dapan1121 已提交
664
  *pRes = NULL;
D
dapan1121 已提交
665

D
dapan1121 已提交
666 667 668
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
669 670 671
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
672
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
673

D
dapan1121 已提交
674
_return:
D
dapan1121 已提交
675

D
dapan1121 已提交
676
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
677

D
dapan1121 已提交
678
  CTG_RET(code);
D
dapan1121 已提交
679 680
}

D
dapan1121 已提交
681 682
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp* pRes) {
  if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
683
    *inCache = true;
D
dapan1121 已提交
684 685
    pRes->pRawRes->pass = true;
    ctgDebug("sysdb %s, pass", pReq->tbName.dbname);
686 687 688
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
689
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, pReq->user, strlen(pReq->user));
D
dapan1121 已提交
690
  if (NULL == pUser) {
D
dapan1121 已提交
691
    ctgDebug("user not in cache, user:%s", pReq->user);
D
dapan1121 已提交
692 693 694 695 696
    goto _return;
  }

  *inCache = true;

D
dapan1121 已提交
697
  ctgDebug("Got user from cache, user:%s", pReq->user);
D
dapan1121 已提交
698
  CTG_CACHE_STAT_INC(numOfUserHit, 1);
dengyihao's avatar
dengyihao 已提交
699

D
dapan1121 已提交
700 701 702
  SCtgAuthReq req = {0};
  req.pRawReq = pReq;
  req.onlyCache = true;
D
dapan1121 已提交
703 704

  CTG_LOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
705 706
  memcpy(&req.authInfo, &pUser->userAuth, sizeof(pUser->userAuth));
  int32_t code = ctgChkSetAuthRes(pCtg, &req, pRes);
D
dapan1121 已提交
707
  CTG_UNLOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
708 709 710 711 712
  CTG_ERR_JRET(code);
  
  if (pRes->metaNotExists) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
713

D
dapan1121 已提交
714
  CTG_RET(code);
D
dapan1121 已提交
715 716 717 718

_return:

  *inCache = false;
D
dapan1121 已提交
719
  CTG_CACHE_STAT_INC(numOfUserMiss, 1);
dengyihao's avatar
dengyihao 已提交
720

D
dapan1121 已提交
721 722 723
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
724
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
725
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
726

D
dapan1121 已提交
727 728 729
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
730
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
731

D
dapan1121 已提交
732 733
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
734
  *op = node->op;
D
dapan1121 已提交
735 736
}

dengyihao's avatar
dengyihao 已提交
737
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
738 739 740
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
741 742
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
743
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
744 745
  }

dengyihao's avatar
dengyihao 已提交
746 747
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
748 749 750
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
751

D
dapan1121 已提交
752
  node->op = operation;
D
dapan1121 已提交
753 754

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
755

D
dapan1121 已提交
756
  if (gCtgMgmt.queue.stopQueue) {
757 758 759 760
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
761

D
dapan1121 已提交
762 763
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
764 765 766

  gCtgMgmt.queue.stopQueue = operation->stopQueue;

D
dapan1121 已提交
767 768
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
769 770
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
771
  CTG_QUEUE_INC();
D
dapan1121 已提交
772
  CTG_RT_STAT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
773 774 775

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
776
  if (syncOp) {
777 778 779
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
780
    tsem_wait(&operation->rspSem);
781 782 783
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
784
    taosMemoryFree(operation);
D
dapan1121 已提交
785 786 787 788 789
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
790 791
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
792 793
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
794
  op->syncOp = true;
dengyihao's avatar
dengyihao 已提交
795

D
dapan1121 已提交
796
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
797
  if (NULL == msg) {
D
dapan1121 已提交
798
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
799
    taosMemoryFree(op);
D
dapan1121 已提交
800
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
801 802 803
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
804
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
805 806 807 808
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
809
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
810 811
  msg->dbId = dbId;

D
dapan1121 已提交
812
  op->data = msg;
D
dapan1121 已提交
813

D
dapan1121 已提交
814
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
815 816 817 818 819 820 821 822

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
823 824
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
825 826 827
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
828

D
dapan1121 已提交
829 830 831
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
832
    taosMemoryFree(op);
D
dapan1121 已提交
833
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
834 835 836
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
837
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
838 839 840 841
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
842
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
843

D
dapan1121 已提交
844
  op->data = msg;
D
dapan1121 已提交
845

D
dapan1121 已提交
846
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
847 848 849 850 851 852 853 854

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
855 856 857
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
858 859 860
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
861

D
dapan1121 已提交
862
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
863
  if (NULL == msg) {
D
dapan1121 已提交
864
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
865
    taosMemoryFree(op);
D
dapan1121 已提交
866
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
867 868 869
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
870 871
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
872 873 874
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
875
  op->data = msg;
D
dapan1121 已提交
876

D
dapan1121 已提交
877
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
878 879 880 881 882 883 884 885

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
886 887
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
888 889 890
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
891

D
dapan1121 已提交
892
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
893
  if (NULL == msg) {
D
dapan1121 已提交
894
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
895
    taosMemoryFree(op);
D
dapan1121 已提交
896
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
897 898 899
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
900 901
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
902 903
  msg->dbId = dbId;

D
dapan1121 已提交
904
  op->data = msg;
D
dapan1121 已提交
905

D
dapan1121 已提交
906
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
907 908 909 910 911 912 913 914

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
915 916
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
917 918 919
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
920

D
dapan1121 已提交
921 922 923
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
924
    taosMemoryFree(op);
925
    freeVgInfo(dbInfo);
D
dapan1121 已提交
926
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
927 928 929
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
930
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
931 932 933
    dbFName = p + 1;
  }

934 935 936 937 938 939
  code = ctgMakeVgArray(dbInfo);
  if (code) {
    taosMemoryFree(op);
    taosMemoryFree(msg);
    freeVgInfo(dbInfo);
    CTG_ERR_RET(code);
940 941
  }

D
dapan1121 已提交
942
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
943 944 945 946
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
947
  op->data = msg;
D
dapan1121 已提交
948

D
dapan1121 已提交
949
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
950 951 952 953 954

  return TSDB_CODE_SUCCESS;

_return:

955
  freeVgInfo(dbInfo);
D
dapan1121 已提交
956 957 958
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
959 960
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
961 962 963
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
964

D
dapan1121 已提交
965
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
966
  if (NULL == msg) {
D
dapan1121 已提交
967
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
968
    taosMemoryFree(op);
D
dapan1121 已提交
969
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
970 971 972
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
973
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
974 975
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
976 977 978
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
979
  msg->pMeta = output;
D
dapan1121 已提交
980

D
dapan1121 已提交
981
  op->data = msg;
D
dapan1121 已提交
982

D
dapan1121 已提交
983
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
984 985

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
986

D
dapan1121 已提交
987 988
_return:

D
dapan1121 已提交
989 990 991 992 993
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
994 995 996
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
997 998
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
999 1000
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
1001

D
dapan1121 已提交
1002 1003 1004
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
1005
    taosMemoryFree(op);
D
dapan1121 已提交
1006
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1007 1008 1009
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1010
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
1011 1012 1013
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
1014
  op->data = msg;
D
dapan1121 已提交
1015

D
dapan1121 已提交
1016
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1017 1018

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1019

D
dapan1121 已提交
1020 1021 1022 1023 1024
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1025 1026
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1027 1028 1029
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1030

D
dapan1121 已提交
1031 1032 1033
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
1034
    taosMemoryFree(op);
D
dapan1121 已提交
1035
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1036 1037 1038 1039 1040
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
1041
  op->data = msg;
D
dapan1121 已提交
1042

D
dapan1121 已提交
1043
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1044

D
dapan1121 已提交
1045
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1046

D
dapan1121 已提交
1047 1048 1049
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
1050

D
dapan1121 已提交
1051 1052 1053
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1054 1055
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1056 1057 1058
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1059

D
dapan1121 已提交
1060 1061 1062
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
1063
    taosMemoryFree(op);
D
dapan1121 已提交
1064
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1065 1066 1067
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1068
  msg->pIndex = *pIndex;
D
dapan1121 已提交
1069 1070 1071 1072

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1073 1074

  *pIndex = NULL;
D
dapan1121 已提交
1075
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1076

D
dapan1121 已提交
1077 1078
_return:

D
dapan1121 已提交
1079
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
1080
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
1081

D
dapan1121 已提交
1082 1083 1084
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1085 1086
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1087 1088 1089
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1090

D
dapan1121 已提交
1091 1092 1093
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
1094
    taosMemoryFree(op);
D
dapan1121 已提交
1095
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1096 1097 1098 1099 1100 1101 1102 1103 1104
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1105

D
dapan1121 已提交
1106
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1107

D
dapan1121 已提交
1108 1109 1110 1111 1112
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1113 1114
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1115 1116 1117
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
1118
  op->stopQueue = stopQueue;
1119
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
1120

D
dapan1121 已提交
1121 1122 1123
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
1124
    taosMemoryFree(op);
D
dapan1121 已提交
1125
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1126 1127 1128
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1129
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
1130 1131 1132
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1133

D
dapan1121 已提交
1134
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1135

D
dapan1121 已提交
1136 1137 1138 1139 1140
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
1141 1142 1143 1144 1145 1146
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1147

D
dapan1121 已提交
1148 1149 1150
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1151
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1152 1153 1154
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1155

D
dapan1121 已提交
1156 1157 1158 1159 1160 1161 1162
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1163 1164
  int32_t       code = 0;

D
dapan1121 已提交
1165 1166 1167 1168
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1169 1170
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1171
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1172 1173 1174 1175
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1176
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1177
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1178 1179 1180 1181
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1182
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1183 1184 1185 1186 1187 1188 1189

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1190 1191
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1192 1193 1194
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1195
  int32_t       code = 0;
D
dapan1121 已提交
1196 1197 1198

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1199
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1200 1201 1202 1203
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1204 1205
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1206 1207 1208 1209 1210 1211 1212
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1213 1214
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1215 1216 1217 1218 1219
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1220
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1221 1222 1223 1224 1225 1226

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1227 1228
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1229 1230 1231 1232 1233 1234 1235 1236 1237 1238
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1239 1240
  int32_t       code = 0;

D
dapan1121 已提交
1241 1242
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1243
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1255
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1256 1257 1258 1259 1260
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1261
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1278 1279
  int32_t       code = 0;

D
dapan1121 已提交
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1298
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1345 1346
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1347
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1348
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1349
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1350 1351
  }

dengyihao's avatar
dengyihao 已提交
1352 1353
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1354
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1355
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1356
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1357 1358 1359 1360 1361 1362 1363 1364
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1365

D
dapan1121 已提交
1366
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1367
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1368 1369
  }

D
dapan1121 已提交
1370
  CTG_CACHE_STAT_INC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1371

D
dapan1121 已提交
1372
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
D
dapan1121 已提交
1373
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1374

dengyihao's avatar
dengyihao 已提交
1375
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1376

D
dapan1121 已提交
1377 1378
  if (!IS_SYS_DBNAME(dbFName)) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1379

D
dapan1121 已提交
1380 1381
    ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
  }
D
dapan1121 已提交
1382 1383 1384 1385 1386 1387 1388 1389 1390 1391

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1392
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1393 1394 1395
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1396

D
dapan1121 已提交
1397 1398 1399 1400
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1401

dengyihao's avatar
dengyihao 已提交
1402 1403 1404
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1405
    }
dengyihao's avatar
dengyihao 已提交
1406

D
dapan1121 已提交
1407
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1408 1409 1410
  }
}

dengyihao's avatar
dengyihao 已提交
1411
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1412
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1413 1414

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1415

D
dapan1121 已提交
1416
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1417

D
dapan1121 已提交
1418
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1419
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1420 1421
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1422 1423 1424
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1425
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1426 1427 1428 1429 1430 1431

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1432
  CTG_CACHE_STAT_DEC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1433 1434
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1435 1436 1437
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1438 1439
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1440 1441
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1442

D
dapan1121 已提交
1443
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1444
    // TODO OPEN IT
D
dapan1121 已提交
1445 1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1461

D
dapan1121 已提交
1462 1463 1464 1465 1466 1467 1468
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1469

D
dapan1121 已提交
1470 1471 1472 1473 1474 1475 1476 1477 1478
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1479 1480
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1481 1482 1483 1484 1485 1486 1487 1488 1489
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1490

D
dapan1121 已提交
1491 1492
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1493

dengyihao's avatar
dengyihao 已提交
1494 1495
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1496

dengyihao's avatar
dengyihao 已提交
1497 1498
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1499

dengyihao's avatar
dengyihao 已提交
1500 1501
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1502

dengyihao's avatar
dengyihao 已提交
1503 1504
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1505 1506
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1507
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1508 1509 1510
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1511 1512 1513 1514 1515
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1516 1517 1518
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1519 1520
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1521 1522
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1523 1524
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1525

D
dapan1121 已提交
1526
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1527
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1528
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1529
      } else {
D
dapan1121 已提交
1530
        CTG_CACHE_STAT_DEC(numOfStb, 1);
dengyihao's avatar
dengyihao 已提交
1531
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1532 1533 1534 1535
      }
    }
  }

D
dapan1121 已提交
1536 1537 1538 1539 1540
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
1541
      taosMemoryFree(meta);
D
dapan1121 已提交
1542
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1543
    }
dengyihao's avatar
dengyihao 已提交
1544

D
dapan1121 已提交
1545 1546
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1547
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1548 1549
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1550
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1551 1552 1553
  }

  if (NULL == orig) {
D
dapan1121 已提交
1554
    CTG_CACHE_STAT_INC(numOfTbl, 1);
D
dapan1121 已提交
1555 1556 1557 1558 1559 1560 1561 1562 1563
  }

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1564
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1565
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1566
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1567 1568
  }

D
dapan1121 已提交
1569
  CTG_CACHE_STAT_INC(numOfStb, 1);
D
dapan1121 已提交
1570

dengyihao's avatar
dengyihao 已提交
1571 1572
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1573

D
dapan1121 已提交
1574 1575 1576
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1577

D
dapan1121 已提交
1578 1579 1580
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1581
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1582
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1583
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1584
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1585
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1586 1587 1588
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1589 1590 1591
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1592 1593 1594
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1595

D
dapan1121 已提交
1596 1597
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1598 1599
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1600
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1601 1602 1603
    }

    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1604 1605
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1606

D
dapan1121 已提交
1607
    if (suid) {
D
dapan1121 已提交
1608
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1609
    }
dengyihao's avatar
dengyihao 已提交
1610

D
dapan1121 已提交
1611 1612 1613
    return TSDB_CODE_SUCCESS;
  }

1614 1615
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1616
  if (pCache->pIndex) {
D
dapan1121 已提交
1617 1618 1619
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1620 1621 1622 1623 1624
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1625 1626
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1627 1628
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1629 1630
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1631

D
dapan1121 已提交
1632 1633 1634
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1635

D
dapan1121 已提交
1636 1637 1638
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1639 1640 1641 1642
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1643
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1644 1645 1646
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1647 1648

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1649

D
dapan1121 已提交
1650 1651 1652 1653 1654 1655
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1656
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1657
  SCatalog *pCtg = NULL;
1658

dengyihao's avatar
dengyihao 已提交
1659
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1660
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1661
    pCtg = *(SCatalog **)pIter;
1662 1663

    if (pCtg) {
D
dapan1121 已提交
1664 1665 1666 1667 1668 1669 1670 1671
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1672
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1673

dengyihao's avatar
dengyihao 已提交
1674
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1675
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1676
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1677 1678 1679

    if (pCtg) {
      ctgFreeHandle(pCtg);
1680 1681 1682 1683 1684 1685 1686 1687
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701
int32_t ctgVgInfoIdComp(void const* lp, void const* rp) {
  int32_t*    key = (int32_t*)lp;
  SVgroupInfo* pVg = (SVgroupInfo*)rp;

  if (*key < pVg->vgId) {
    return -1;
  } else if (*key > pVg->vgId) {
    return 1;
  }

  return 0;
}


D
dapan1121 已提交
1702
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1703
  int32_t          code = 0;
D
dapan1121 已提交
1704
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1705 1706 1707 1708
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1709
  if (pCtg->stopUpdate || NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1710
    goto _return;
D
dapan1121 已提交
1711
  }
dengyihao's avatar
dengyihao 已提交
1712

D
dapan1121 已提交
1713
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1714
    ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dengyihao's avatar
dengyihao 已提交
1715
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1716
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1717 1718
  }

dengyihao's avatar
dengyihao 已提交
1719
  bool         newAdded = false;
dengyihao's avatar
dengyihao 已提交
1720 1721
  SDbVgVersion vgVersion = {
      .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
D
dapan1121 已提交
1722 1723

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1724
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1725
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1726
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1727
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1728 1729 1730
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1731
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1732

D
dapan1121 已提交
1733 1734
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1735

D
dapan1121 已提交
1736
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
dengyihao's avatar
dengyihao 已提交
1737 1738
      ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion,
               vgInfo->vgVersion);
D
dapan1121 已提交
1739
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1740

D
dapan1121 已提交
1741
      goto _return;
D
dapan1121 已提交
1742 1743
    }

dengyihao's avatar
dengyihao 已提交
1744 1745 1746 1747
    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable &&
        dbInfo->stateTs == vgInfo->stateTs) {
      ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName,
               dbInfo->vgVersion, dbInfo->numOfTable, dbInfo->stateTs);
D
dapan1121 已提交
1748
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1749

D
dapan1121 已提交
1750
      goto _return;
D
dapan1121 已提交
1751 1752
    }

1753
    freeVgInfo(vgInfo);
D
dapan1121 已提交
1754 1755 1756 1757 1758
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;

dengyihao's avatar
dengyihao 已提交
1759 1760
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
           vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
D
dapan1121 已提交
1761 1762 1763 1764 1765

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

1766
  //if (!IS_SYS_DBNAME(dbFName)) {
D
dapan1121 已提交
1767 1768 1769
    tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
    CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                   ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
1770
  //}
D
dapan1121 已提交
1771 1772 1773

_return:

1774
  freeVgInfo(msg->dbInfo);
D
dapan1121 已提交
1775
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1776

D
dapan1121 已提交
1777 1778 1779
  CTG_RET(code);
}

D
dapan1121 已提交
1780
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1781
  int32_t        code = 0;
D
dapan1121 已提交
1782
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1783
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1784

D
dapan1121 已提交
1785 1786 1787 1788
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1789 1790 1791 1792 1793
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1794

1795
  if (msg->dbId && dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1796 1797
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1798 1799
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1800

D
dapan1121 已提交
1801 1802 1803 1804 1805
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1806

D
dapan1121 已提交
1807 1808 1809
  CTG_RET(code);
}

D
dapan1121 已提交
1810
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1811
  int32_t              code = 0;
D
dapan1121 已提交
1812
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1813
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1814

D
dapan1121 已提交
1815 1816 1817 1818
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1819 1820 1821 1822 1823
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1824

dengyihao's avatar
dengyihao 已提交
1825
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1826

1827
  freeVgInfo(dbCache->vgCache.vgInfo);
D
dapan1121 已提交
1828
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1829 1830 1831

  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1832
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1833 1834 1835 1836

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1837

D
dapan1121 已提交
1838 1839 1840
  CTG_RET(code);
}

D
dapan1121 已提交
1841
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1842
  int32_t              code = 0;
D
dapan1121 已提交
1843
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1844 1845 1846
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1847

D
dapan1121 已提交
1848 1849 1850
  if (pCtg->stopUpdate) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1851

D
dapan1121 已提交
1852 1853
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1854 1855 1856
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1857 1858
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1859
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1860 1861
  }

D
dapan1121 已提交
1862
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1863
  if (NULL == dbCache) {
D
dapan1121 已提交
1864
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1865 1866 1867
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1868 1869
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1870
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1871
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1872
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1873 1874
  }

D
dapan1121 已提交
1875
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1876
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1877 1878 1879 1880
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1881 1882
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1883 1884 1885 1886
  }

_return:

D
dapan1121 已提交
1887 1888
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1889

D
dapan1121 已提交
1890
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1891

D
dapan1121 已提交
1892 1893 1894
  CTG_RET(code);
}

D
dapan1121 已提交
1895
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1896
  int32_t             code = 0;
D
dapan1121 已提交
1897
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1898
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1899

D
dapan1121 已提交
1900 1901 1902 1903
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1904 1905 1906
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1907
    goto _return;
D
dapan1121 已提交
1908 1909 1910
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1911
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1912
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1913
    goto _return;
D
dapan1121 已提交
1914
  }
dengyihao's avatar
dengyihao 已提交
1915

D
dapan1121 已提交
1916
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1917 1918
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1919
  } else {
D
dapan1121 已提交
1920
    CTG_CACHE_STAT_DEC(numOfStb, 1);
D
dapan1121 已提交
1921 1922
  }

dengyihao's avatar
dengyihao 已提交
1923
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1924 1925 1926 1927 1928 1929 1930 1931 1932
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1933 1934
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1935
  } else {
D
dapan1121 已提交
1936
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1937
  }
dengyihao's avatar
dengyihao 已提交
1938 1939

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1940 1941

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1942 1943 1944

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1945 1946 1947
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1948

D
dapan1121 已提交
1949 1950 1951
  CTG_RET(code);
}

D
dapan1121 已提交
1952
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1953
  int32_t             code = 0;
D
dapan1121 已提交
1954
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1955
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1956

D
dapan1121 已提交
1957 1958 1959 1960
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1961 1962 1963
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1964
    goto _return;
D
dapan1121 已提交
1965 1966 1967
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1968 1969
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
1970
    goto _return;
D
dapan1121 已提交
1971 1972
  }

dengyihao's avatar
dengyihao 已提交
1973
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
1974 1975 1976 1977 1978 1979 1980 1981
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
1982

D
dapan1121 已提交
1983 1984 1985
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1986
  } else {
D
dapan1121 已提交
1987
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1988 1989
  }

D
dapan1121 已提交
1990
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
1991 1992 1993 1994 1995 1996 1997 1998

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
1999
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2000
  int32_t            code = 0;
D
dapan1121 已提交
2001
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2002 2003
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
2004 2005 2006 2007
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2008 2009 2010 2011
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

D
dapan1121 已提交
2012
    memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
D
dapan1121 已提交
2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_WRITE, &pUser->lock);

D
dapan1121 已提交
2026 2027
  taosHashCleanup(pUser->userAuth.createdDbs);
  pUser->userAuth.createdDbs = msg->userAuth.createdDbs;
D
dapan1121 已提交
2028 2029
  msg->userAuth.createdDbs = NULL;

D
dapan1121 已提交
2030 2031
  taosHashCleanup(pUser->userAuth.readDbs);
  pUser->userAuth.readDbs = msg->userAuth.readDbs;
D
dapan1121 已提交
2032 2033
  msg->userAuth.readDbs = NULL;

D
dapan1121 已提交
2034 2035
  taosHashCleanup(pUser->userAuth.writeDbs);
  pUser->userAuth.writeDbs = msg->userAuth.writeDbs;
D
dapan1121 已提交
2036 2037
  msg->userAuth.writeDbs = NULL;

X
Xiaoyu Wang 已提交
2038 2039 2040 2041 2042 2043 2044 2045
  taosHashCleanup(pUser->userAuth.readTbs);
  pUser->userAuth.readTbs = msg->userAuth.readTbs;
  msg->userAuth.readTbs = NULL;

  taosHashCleanup(pUser->userAuth.writeTbs);
  pUser->userAuth.writeTbs = msg->userAuth.writeTbs;
  msg->userAuth.writeTbs = NULL;

D
dapan1121 已提交
2046 2047 2048 2049 2050 2051 2052
  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2053 2054
  taosHashCleanup(msg->userAuth.readTbs);
  taosHashCleanup(msg->userAuth.writeTbs);
dengyihao's avatar
dengyihao 已提交
2055

D
dapan1121 已提交
2056
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2057

D
dapan1121 已提交
2058 2059 2060
  CTG_RET(code);
}

D
dapan1121 已提交
2061
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2062
  int32_t             code = 0;
D
dapan1121 已提交
2063
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2064
  SCatalog           *pCtg = msg->pCtg;
dengyihao's avatar
dengyihao 已提交
2065
  SCtgDBCache        *dbCache = NULL;
D
dapan1121 已提交
2066 2067 2068 2069 2070

  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2071
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2072 2073 2074 2075 2076
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
2077 2078
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
2079
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
2080
  if (NULL == vgInfo) {
D
dapan1121 已提交
2081 2082 2083
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
2084 2085

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
2086
  if (NULL == pInfo) {
2087 2088 2089 2090 2091 2092 2093
    ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

  SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
  if (NULL == pInfo2) {
    ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
D
dapan1121 已提交
2094 2095 2096
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
2097 2098 2099 2100 2101
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
2102

D
dapan1121 已提交
2103
  pInfo->epSet = msg->epSet;
2104
  pInfo2->epSet = msg->epSet;
D
dapan1121 已提交
2105 2106 2107

_return:

D
dapan1121 已提交
2108
  if (code == TSDB_CODE_SUCCESS && dbCache) {
D
dapan1121 已提交
2109
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
2110 2111 2112
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2113

D
dapan1121 已提交
2114 2115 2116
  CTG_RET(code);
}

D
dapan1121 已提交
2117
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2118
  int32_t               code = 0;
D
dapan1121 已提交
2119
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2120 2121 2122 2123
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
2124 2125 2126 2127
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2128
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
2129 2130 2131 2132 2133 2134 2135 2136 2137

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2138

D
dapan1121 已提交
2139
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2140

D
dapan1121 已提交
2141 2142 2143 2144
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2145
  int32_t             code = 0;
D
dapan1121 已提交
2146
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2147 2148 2149
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
2150 2151 2152 2153
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2154
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2155
  if (NULL == dbCache) {
D
dapan1121 已提交
2156 2157 2158
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
2159
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
2160 2161
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
2162
  }
D
dapan1121 已提交
2163 2164 2165
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
2166

D
dapan1121 已提交
2167
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
2168 2169 2170 2171 2172 2173 2174

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2175

D
dapan1121 已提交
2176
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2177

D
dapan1121 已提交
2178 2179 2180
  CTG_RET(code);
}

D
dapan1121 已提交
2181
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2182
  int32_t            code = 0;
D
dapan1121 已提交
2183
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2184
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
2185

D
dapan1121 已提交
2186 2187
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
2188
  if (pCtg) {
D
dapan1121 已提交
2189 2190 2191 2192 2193
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
2194

D
dapan1121 已提交
2195 2196
    goto _return;
  }
D
dapan1121 已提交
2197 2198 2199 2200 2201 2202

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
2203 2204

_return:
D
dapan1121 已提交
2205 2206

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
2207

D
dapan1121 已提交
2208
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2209

D
dapan1121 已提交
2210 2211 2212
  CTG_RET(code);
}

2213 2214 2215 2216 2217 2218 2219 2220
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
  if (NULL == op || NULL == op->data) {
    return;
  }

  switch (op->opId) {
    case CTG_OP_UPDATE_VGROUP: {
      SCtgUpdateVgMsg *msg = op->data;
2221
      freeVgInfo(msg->dbInfo);
2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_META: {
      SCtgUpdateTbMetaMsg *msg = op->data;
      taosMemoryFreeClear(msg->pMeta->tbMeta);
      taosMemoryFreeClear(msg->pMeta);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_DROP_DB_CACHE:
    case CTG_OP_DROP_DB_VGROUP:
    case CTG_OP_DROP_STB_META:
    case CTG_OP_DROP_TB_META:
    case CTG_OP_UPDATE_VG_EPSET:
    case CTG_OP_DROP_TB_INDEX:
    case CTG_OP_CLEAR_CACHE: {
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_USER: {
      SCtgUpdateUserMsg *msg = op->data;
      taosHashCleanup(msg->userAuth.createdDbs);
      taosHashCleanup(msg->userAuth.readDbs);
      taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2247 2248
      taosHashCleanup(msg->userAuth.readTbs);
      taosHashCleanup(msg->userAuth.writeTbs);
2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_INDEX: {
      SCtgUpdateTbIndexMsg *msg = op->data;
      if (msg->pIndex) {
        taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
        taosMemoryFreeClear(msg->pIndex);
      }
      taosMemoryFreeClear(op->data);
      break;
    }
    default: {
      qError("invalid cache op id:%d", op->opId);
      break;
    }
  }
}

D
dapan1121 已提交
2268
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2269 2270
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2271
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2272
  bool                stopQueue = false;
D
dapan1121 已提交
2273 2274 2275 2276 2277

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2278 2279 2280 2281 2282 2283
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
dengyihao's avatar
dengyihao 已提交
2284
          CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2285
        } else {
2286
          ctgFreeCacheOperationData(op);
dengyihao's avatar
dengyihao 已提交
2287
          CTG_RT_STAT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2288
        }
dengyihao's avatar
dengyihao 已提交
2289

D
dapan1121 已提交
2290 2291
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2292
        } else {
D
dapan1121 已提交
2293
          taosMemoryFree(op);
D
dapan1121 已提交
2294
        }
D
dapan1121 已提交
2295
      }
D
dapan1121 已提交
2296 2297 2298

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2299

D
dapan1121 已提交
2300
      node = nodeNext;
D
dapan1121 已提交
2301 2302
    }

D
dapan1121 已提交
2303
    if (!stopQueue) {
D
dapan1121 已提交
2304 2305 2306 2307
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2308 2309 2310 2311 2312 2313
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2314
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2315
  setThreadName("catalog");
2316

D
dapan1121 已提交
2317 2318 2319 2320 2321 2322
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2323

2324
    if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
D
dapan1121 已提交
2325
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2326 2327 2328
      break;
    }

D
dapan1121 已提交
2329 2330 2331
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2332

D
dapan1121 已提交
2333
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2334

D
dapan1121 已提交
2335
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2336

D
dapan1121 已提交
2337
    if (operation->syncOp) {
D
dapan1121 已提交
2338
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2339 2340
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2341 2342
    }

dengyihao's avatar
dengyihao 已提交
2343
    CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2344

D
dapan1121 已提交
2345
    ctgdShowCacheInfo();
D
dapan1121 已提交
2346 2347 2348
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2349

D
dapan1121 已提交
2350 2351 2352 2353 2354 2355 2356 2357 2358 2359 2360 2361
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2362

D
dapan1121 已提交
2363 2364 2365 2366
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2367
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2368
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2369 2370 2371 2372 2373 2374 2375 2376 2377 2378 2379 2380
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
2381
    *pTableMeta = NULL;
D
dapan1121 已提交
2382 2383 2384 2385 2386 2387 2388 2389 2390
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2391
#if 0
2392
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2430
      fetch.tbIdx = i;
D
dapan1121 已提交
2431 2432 2433 2434 2435 2436
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2437
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2438 2439 2440
  }

  if (NULL == ctx->pFetchs) {
2441
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2442 2443 2444 2445
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2446 2447
#endif

dengyihao's avatar
dengyihao 已提交
2448 2449 2450 2451 2452 2453 2454 2455 2456
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2457 2458 2459 2460 2461 2462 2463 2464
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2465
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2466
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2467

D
dapan1121 已提交
2468 2469 2470
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2471
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2472
      taosArrayPush(ctx->pResList, &(SMetaData){0});
D
dapan1121 已提交
2473 2474 2475 2476 2477 2478
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2479
    pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2480 2481 2482 2483

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2484
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2485
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2486

D
dapan1121 已提交
2487 2488 2489 2490 2491
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
K
kailixu 已提交
2492
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
D
dapan1121 已提交
2493
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2494
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2495
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2496

D
dapan1121 已提交
2497 2498 2499
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2500
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2501 2502 2503 2504 2505 2506 2507 2508

    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2509 2510
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2511 2512 2513 2514
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2515
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2516 2517
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2518

D
dapan1121 已提交
2519
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2520

2521
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2522 2523
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2524
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2525

D
dapan1121 已提交
2526
      res.pRes = pTableMeta;
2527
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2528 2529 2530

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2531

D
dapan1121 已提交
2532 2533 2534 2535
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2536
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2537

2538
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2539 2540
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2541
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2542

D
dapan1121 已提交
2543
      res.pRes = pTableMeta;
2544
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2545

D
dapan1121 已提交
2546 2547
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2548

D
dapan1121 已提交
2549 2550 2551
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2552
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2553 2554
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2555

D
dapan1121 已提交
2556
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2557

2558
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2559 2560 2561 2562 2563 2564
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2565 2566
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2567
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2568
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2569 2570 2571 2572 2573 2574 2575 2576 2577

      taosMemoryFreeClear(pTableMeta);
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2578

2579
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2580
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2581

dengyihao's avatar
dengyihao 已提交
2582
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2583 2584 2585 2586 2587 2588 2589 2590
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2591
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2592 2593
      taosHashRelease(dbCache->tbCache, pCache);

2594
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2595
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2596 2597 2598 2599 2600

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2601 2602 2603

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2604
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2605 2606 2607 2608 2609
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2610
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2611
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2612 2613 2614 2615 2616

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2617

D
dapan1121 已提交
2618 2619
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2620
    if (NULL == pTableMeta) {
2621
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2622 2623
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2624

D
dapan1121 已提交
2625
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2626

2627
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2628 2629
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2630
    res.pRes = pTableMeta;
2631
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2632 2633 2634 2635 2636

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2637
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2638

D
dapan1121 已提交
2639 2640 2641
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2642
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2643
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2644
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2645 2646 2647 2648 2649 2650 2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2673
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2674 2675 2676 2677
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2678
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2679 2680 2681 2682 2683 2684 2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}