ctgCache.c 74.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34 35
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
36

D
dapan1121 已提交
37
  if (dbCache->deleted) {
D
dapan1121 已提交
38
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
39

dengyihao's avatar
dengyihao 已提交
40 41
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
42 43 44 45
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
46 47
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
48 49

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
50
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
51 52 53 54
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57 58
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
59 60
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
61 62

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
63
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
64
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
65 66 67 68 69 70
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
71
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
72

dengyihao's avatar
dengyihao 已提交
73
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
74

dengyihao's avatar
dengyihao 已提交
75 76
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
D
dapan1121 已提交
77 78
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
79

dengyihao's avatar
dengyihao 已提交
80
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
81
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
82
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
83 84 85
    dbFName = p + 1;
  }

D
dapan1121 已提交
86 87 88 89 90 91 92
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
dengyihao's avatar
dengyihao 已提交
93

D
dapan1121 已提交
94 95 96 97 98 99
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
100 101 102 103
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
104 105 106
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
107 108
    }

D
dapan1121 已提交
109 110 111 112 113 114
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
dengyihao's avatar
dengyihao 已提交
115

D
dapan1121 已提交
116 117 118
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
119
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
120 121 122
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
123
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
124 125 126
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
127
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
128 129 130
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
131

dengyihao's avatar
dengyihao 已提交
132
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
133
  if (pCache && dbCache) {
D
dapan1121 已提交
134
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
135
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
136
  }
D
dapan1121 已提交
137

D
dapan1121 已提交
138 139
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
140
  }
D
dapan1121 已提交
141
}
D
dapan1121 已提交
142

dengyihao's avatar
dengyihao 已提交
143
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
144 145
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
146
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
147 148 149 150 151 152 153
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

154
void ctgReleaseVgMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
155
  if (pCache && dbCache) {
156 157 158 159
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
    taosHashRelease(dbCache->tbCache, pCache);
  }

D
dapan1121 已提交
160 161 162 163
  if (dbCache) {
    ctgRUnlockVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }
164 165
}

dengyihao's avatar
dengyihao 已提交
166
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
167
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
168
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
169
  if (NULL == dbCache) {
D
dapan1121 已提交
170 171 172 173 174
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
175
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
176 177 178 179 180 181 182
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
183
  CTG_CACHE_STAT_INC(numOfVgHit, 1);
D
dapan1121 已提交
184 185

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
186

D
dapan1121 已提交
187 188 189 190 191 192 193 194 195 196
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
197
  CTG_CACHE_STAT_INC(numOfVgMiss, 1);
dengyihao's avatar
dengyihao 已提交
198

D
dapan1121 已提交
199 200 201
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
202
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
203
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
204
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
205 206 207 208 209
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
210

D
dapan1121 已提交
211
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
212 213 214
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
215 216
  }

D
dapan1121 已提交
217 218 219 220 221 222 223 224 225 226
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
229 230 231 232 233 234 235

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
236
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
dengyihao's avatar
dengyihao 已提交
237

D
dapan1121 已提交
238 239 240
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
241 242
int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const char *tbName, SCtgDBCache **pDb,
                                  SCtgTbCache **pTb) {
243 244
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
X
Xiaoyu Wang 已提交
245
  bool         vgInCache = false;
246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309

  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    CTG_CACHE_STAT_INC(numOfVgMiss, 1);
    goto _return;
  }

  ctgRLockVgInfo(pCtg, dbCache, &vgInCache);
  if (!vgInCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    CTG_CACHE_STAT_INC(numOfVgMiss, 1);
    goto _return;
  }

  *pDb = dbCache;

  CTG_CACHE_STAT_INC(numOfVgHit, 1);

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);

  tbCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
  if (NULL == tbCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (NULL == tbCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
    goto _return;
  }

  *pTb = tbCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);

  CTG_CACHE_STAT_INC(numOfMetaHit, 1);

  return TSDB_CODE_SUCCESS;

_return:

  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }

  if (vgInCache) {
    ctgRUnlockVgInfo(dbCache);
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pDb = NULL;
  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

310
/*
dengyihao's avatar
dengyihao 已提交
311 312 313
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
314 315 316 317 318
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
319 320

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
321
  if (NULL == stName) {
D
dapan1121 已提交
322
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
323 324 325 326 327
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
328
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
329 330 331 332
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

D
dapan1121 已提交
333 334
  taosHashRelease(dbCache->stbCache, stName);

D
dapan1121 已提交
335 336
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
337
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
338 339 340 341 342 343
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
344
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
345

D
dapan1121 已提交
346
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
347 348 349 350 351 352 353

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
354
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
D
dapan1121 已提交
355 356 357

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
358

D
dapan1121 已提交
359 360
  return TSDB_CODE_SUCCESS;
}
361
*/
D
dapan1121 已提交
362

X
Xiaoyu Wang 已提交
363 364
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid,
                                   SCtgTbCache **pTb) {
D
dapan1121 已提交
365
  SCtgTbCache *pCache = NULL;
X
Xiaoyu Wang 已提交
366
  char        *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405
  if (NULL == stName) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  taosHashRelease(dbCache->stbCache, stName);

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  *pTb = pCache;

  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);

  CTG_CACHE_STAT_INC(numOfMetaHit, 1);

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);

  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
406
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
407
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
408
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
409 410
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
411 412 413
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
414

D
dapan1121 已提交
415
  int32_t sz = 0;
D
dapan1121 已提交
416
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
417 418 419 420 421 422 423 424 425
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
426 427
  }

D
dapan1121 已提交
428 429 430 431
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
432

D
dapan1121 已提交
433
  CTG_CACHE_STAT_INC(numOfIndexHit, 1);
D
dapan1121 已提交
434 435 436 437 438 439 440

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
441
  CTG_CACHE_STAT_INC(numOfIndexMiss, 1);
dengyihao's avatar
dengyihao 已提交
442

D
dapan1121 已提交
443 444 445
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
446
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
447
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
448
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
449 450 451
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
452

D
dapan1121 已提交
453 454 455 456 457
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
458
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
459

D
dapan1121 已提交
460 461 462
  return TSDB_CODE_SUCCESS;
}

X
Xiaoyu Wang 已提交
463 464
int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCtgTbCache **pTb, STableMeta **pTableMeta,
                      char *dbFName) {
D
dapan1121 已提交
465
  SCtgDBCache *dbCache = *pDb;
466
  SCtgTbCache *tbCache = *pTb;
X
Xiaoyu Wang 已提交
467
  STableMeta  *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
468 469 470 471
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
472

D
dapan1121 已提交
473
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
474 475 476
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
477
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
478 479 480
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
481

D
dapan1121 已提交
482
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
483 484
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
485 486

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
487

D
dapan1121 已提交
488 489 490
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
491
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
492 493
  }

D
dapan1121 已提交
494
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
495

X
Xiaoyu Wang 已提交
496
  // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
497

498 499 500
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
  taosHashRelease(dbCache->tbCache, tbCache);
  *pTb = NULL;
X
Xiaoyu Wang 已提交
501

dengyihao's avatar
dengyihao 已提交
502 503
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
504

505
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
D
dapan1121 已提交
506 507
  if (NULL == tbCache) {
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
508
    *pDb = NULL;
D
dapan1121 已提交
509 510 511
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
512

513 514
  *pTb = tbCache;

dengyihao's avatar
dengyihao 已提交
515 516 517
  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
518 519
    taosMemoryFreeClear(*pTableMeta);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
520 521
  }

D
dapan1121 已提交
522
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
523
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
524
  if (NULL == *pTableMeta) {
525
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
526 527
  }

D
dapan1121 已提交
528
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
529

530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551
  return TSDB_CODE_SUCCESS;
}

int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *tbCache = NULL;
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
552
  CTG_ERR_JRET(ctgCopyTbMeta(pCtg, ctx, &dbCache, &tbCache, pTableMeta, dbFName));
553

D
dapan1121 已提交
554
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
555

D
dapan1121 已提交
556
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
557

D
dapan1121 已提交
558 559 560 561
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
562
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
563
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
564
  *pTableMeta = NULL;
dengyihao's avatar
dengyihao 已提交
565

D
dapan1121 已提交
566 567 568
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
569 570
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
571
  *sver = -1;
D
dapan1121 已提交
572
  *tver = -1;
D
dapan1121 已提交
573 574

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
575
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
576
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
577 578
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
579 580 581
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
582 583 584
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
585
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
586 587
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
588

D
dapan1121 已提交
589
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
590 591 592
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
593 594
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
595

D
dapan1121 已提交
596
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
597 598 599
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
600
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
601

X
Xiaoyu Wang 已提交
602
  // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
603 604 605 606
  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }
X
Xiaoyu Wang 已提交
607

D
dapan1121 已提交
608
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
609

610
  ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
D
dapan1121 已提交
611
  if (NULL == tbCache) {
X
Xiaoyu Wang 已提交
612
    // ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
613
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
614 615
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
616 617

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
618
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
619
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
620
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
621 622 623
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
624
  size_t nameLen = 0;
D
dapan1121 已提交
625
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
626 627 628 629

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
630 631
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
632

D
dapan1121 已提交
633
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
634

dengyihao's avatar
dengyihao 已提交
635 636
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
637 638 639 640

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
641
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
642
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
643
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
644
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
645 646
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
647 648
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
649 650 651 652

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
653 654
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
655 656 657
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
658 659
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
660
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
661
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
662 663
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
664

D
dapan1121 已提交
665
  *pRes = NULL;
D
dapan1121 已提交
666

D
dapan1121 已提交
667 668 669
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
670 671 672
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
673
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
674

D
dapan1121 已提交
675
_return:
D
dapan1121 已提交
676

D
dapan1121 已提交
677
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
678

D
dapan1121 已提交
679
  CTG_RET(code);
D
dapan1121 已提交
680 681
}

X
Xiaoyu Wang 已提交
682
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
D
dapan1121 已提交
683
  if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
684
    *inCache = true;
D
dapan1121 已提交
685 686
    pRes->pRawRes->pass = true;
    ctgDebug("sysdb %s, pass", pReq->tbName.dbname);
687 688 689
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
690
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, pReq->user, strlen(pReq->user));
D
dapan1121 已提交
691
  if (NULL == pUser) {
D
dapan1121 已提交
692
    ctgDebug("user not in cache, user:%s", pReq->user);
D
dapan1121 已提交
693 694 695 696 697
    goto _return;
  }

  *inCache = true;

D
dapan1121 已提交
698
  ctgDebug("Got user from cache, user:%s", pReq->user);
D
dapan1121 已提交
699
  CTG_CACHE_STAT_INC(numOfUserHit, 1);
dengyihao's avatar
dengyihao 已提交
700

D
dapan1121 已提交
701 702 703
  SCtgAuthReq req = {0};
  req.pRawReq = pReq;
  req.onlyCache = true;
D
dapan1121 已提交
704 705

  CTG_LOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
706 707
  memcpy(&req.authInfo, &pUser->userAuth, sizeof(pUser->userAuth));
  int32_t code = ctgChkSetAuthRes(pCtg, &req, pRes);
D
dapan1121 已提交
708
  CTG_UNLOCK(CTG_READ, &pUser->lock);
D
dapan1121 已提交
709
  CTG_ERR_JRET(code);
X
Xiaoyu Wang 已提交
710

D
dapan1121 已提交
711 712 713
  if (pRes->metaNotExists) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
714

D
dapan1121 已提交
715
  CTG_RET(code);
D
dapan1121 已提交
716 717 718 719

_return:

  *inCache = false;
D
dapan1121 已提交
720
  CTG_CACHE_STAT_INC(numOfUserMiss, 1);
dengyihao's avatar
dengyihao 已提交
721

D
dapan1121 已提交
722 723 724
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
725
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
726
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
727

D
dapan1121 已提交
728 729 730
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
731
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
732

D
dapan1121 已提交
733 734
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
735
  *op = node->op;
D
dapan1121 已提交
736 737
}

dengyihao's avatar
dengyihao 已提交
738
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
739 740 741
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
742 743
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
744
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
745 746
  }

dengyihao's avatar
dengyihao 已提交
747 748
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
749 750 751
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
752

D
dapan1121 已提交
753
  node->op = operation;
D
dapan1121 已提交
754 755

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
756

D
dapan1121 已提交
757
  if (gCtgMgmt.queue.stopQueue) {
758 759 760 761
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
762

D
dapan1121 已提交
763 764
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
765 766 767

  gCtgMgmt.queue.stopQueue = operation->stopQueue;

D
dapan1121 已提交
768 769
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
770 771
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
772
  CTG_QUEUE_INC();
D
dapan1121 已提交
773
  CTG_RT_STAT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
774 775 776

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
777
  if (syncOp) {
778 779 780
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
781
    tsem_wait(&operation->rspSem);
782 783 784
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
785
    taosMemoryFree(operation);
D
dapan1121 已提交
786 787 788 789 790
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
791 792
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
793 794
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
795
  op->syncOp = true;
dengyihao's avatar
dengyihao 已提交
796

D
dapan1121 已提交
797
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
798
  if (NULL == msg) {
D
dapan1121 已提交
799
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
800
    taosMemoryFree(op);
D
dapan1121 已提交
801
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
802 803 804
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
805
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
806 807 808 809
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
810
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
811 812
  msg->dbId = dbId;

D
dapan1121 已提交
813
  op->data = msg;
D
dapan1121 已提交
814

D
dapan1121 已提交
815
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
816 817 818 819 820 821 822 823

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
824 825
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
826 827 828
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
829

D
dapan1121 已提交
830 831 832
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
833
    taosMemoryFree(op);
D
dapan1121 已提交
834
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
835 836 837
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
838
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
839 840 841 842
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
843
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
844

D
dapan1121 已提交
845
  op->data = msg;
D
dapan1121 已提交
846

D
dapan1121 已提交
847
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
848 849 850 851 852 853 854 855

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
856 857 858
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
859 860 861
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
862

D
dapan1121 已提交
863
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
864
  if (NULL == msg) {
D
dapan1121 已提交
865
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
866
    taosMemoryFree(op);
D
dapan1121 已提交
867
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
868 869 870
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
871 872
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
873 874 875
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
876
  op->data = msg;
D
dapan1121 已提交
877

D
dapan1121 已提交
878
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
879 880 881 882 883 884 885 886

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
887 888
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
889 890 891
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
892

D
dapan1121 已提交
893
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
894
  if (NULL == msg) {
D
dapan1121 已提交
895
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
896
    taosMemoryFree(op);
D
dapan1121 已提交
897
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
898 899 900
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
901 902
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
903 904
  msg->dbId = dbId;

D
dapan1121 已提交
905
  op->data = msg;
D
dapan1121 已提交
906

D
dapan1121 已提交
907
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
908 909 910 911 912 913 914 915

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
916 917
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
918 919 920
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
921

D
dapan1121 已提交
922 923 924
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
925
    taosMemoryFree(op);
926
    freeVgInfo(dbInfo);
D
dapan1121 已提交
927
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
928 929 930
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
931
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
932 933 934
    dbFName = p + 1;
  }

935 936 937 938 939 940
  code = ctgMakeVgArray(dbInfo);
  if (code) {
    taosMemoryFree(op);
    taosMemoryFree(msg);
    freeVgInfo(dbInfo);
    CTG_ERR_RET(code);
941 942
  }

D
dapan1121 已提交
943
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
944 945 946 947
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
948
  op->data = msg;
D
dapan1121 已提交
949

D
dapan1121 已提交
950
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
951 952 953 954 955

  return TSDB_CODE_SUCCESS;

_return:

956
  freeVgInfo(dbInfo);
D
dapan1121 已提交
957 958 959
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
960 961
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
962 963 964
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
965

D
dapan1121 已提交
966
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
967
  if (NULL == msg) {
D
dapan1121 已提交
968
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
969
    taosMemoryFree(op);
D
dapan1121 已提交
970
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
971 972 973
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
974
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
975 976
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
977 978 979
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
980
  msg->pMeta = output;
D
dapan1121 已提交
981

D
dapan1121 已提交
982
  op->data = msg;
D
dapan1121 已提交
983

D
dapan1121 已提交
984
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
985 986

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
987

D
dapan1121 已提交
988 989
_return:

D
dapan1121 已提交
990 991 992 993 994
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
995 996 997
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
998 999
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
1000 1001
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
1002

D
dapan1121 已提交
1003 1004 1005
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
1006
    taosMemoryFree(op);
D
dapan1121 已提交
1007
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1008 1009 1010
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1011
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
1012 1013 1014
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
1015
  op->data = msg;
D
dapan1121 已提交
1016

D
dapan1121 已提交
1017
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1018 1019

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1020

D
dapan1121 已提交
1021 1022 1023 1024 1025
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1026 1027
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1028 1029 1030
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1031

D
dapan1121 已提交
1032 1033 1034
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
1035
    taosMemoryFree(op);
D
dapan1121 已提交
1036
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1037 1038 1039 1040 1041
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
1042
  op->data = msg;
D
dapan1121 已提交
1043

D
dapan1121 已提交
1044
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1045

D
dapan1121 已提交
1046
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1047

D
dapan1121 已提交
1048 1049 1050
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
1051

D
dapan1121 已提交
1052 1053 1054
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1055 1056
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1057 1058 1059
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1060

D
dapan1121 已提交
1061 1062 1063
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
1064
    taosMemoryFree(op);
D
dapan1121 已提交
1065
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1066 1067 1068
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1069
  msg->pIndex = *pIndex;
D
dapan1121 已提交
1070 1071 1072 1073

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
1074 1075

  *pIndex = NULL;
D
dapan1121 已提交
1076
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1077

D
dapan1121 已提交
1078 1079
_return:

D
dapan1121 已提交
1080
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
1081
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
1082

D
dapan1121 已提交
1083 1084 1085
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1086 1087
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1088 1089 1090
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
1091

D
dapan1121 已提交
1092 1093 1094
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
1095
    taosMemoryFree(op);
D
dapan1121 已提交
1096
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1097 1098 1099 1100 1101 1102 1103 1104 1105
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1106

D
dapan1121 已提交
1107
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1108

D
dapan1121 已提交
1109 1110 1111 1112 1113
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1114 1115
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1116 1117 1118
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
1119
  op->stopQueue = stopQueue;
1120
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
1121

D
dapan1121 已提交
1122 1123 1124
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
1125
    taosMemoryFree(op);
D
dapan1121 已提交
1126
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1127 1128 1129
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1130
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
1131 1132 1133
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1134

D
dapan1121 已提交
1135
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1136

D
dapan1121 已提交
1137 1138 1139 1140 1141
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
1142 1143 1144 1145 1146 1147
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1148

D
dapan1121 已提交
1149 1150 1151
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1152
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1153 1154 1155
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1156

D
dapan1121 已提交
1157 1158 1159 1160 1161 1162 1163
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1164 1165
  int32_t       code = 0;

D
dapan1121 已提交
1166 1167 1168 1169
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1170 1171
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1172
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1173 1174 1175 1176
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1177
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1178
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1179 1180 1181 1182
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1183
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1184 1185 1186 1187 1188 1189 1190

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1191 1192
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1193 1194 1195
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1196
  int32_t       code = 0;
D
dapan1121 已提交
1197 1198 1199

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1200
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1201 1202 1203 1204
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1205 1206
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1207 1208 1209 1210 1211 1212 1213
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1214 1215
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1216 1217 1218 1219 1220
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1221
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1222 1223 1224 1225 1226 1227

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1228 1229
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1230 1231 1232 1233 1234 1235 1236 1237 1238 1239
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1240 1241
  int32_t       code = 0;

D
dapan1121 已提交
1242 1243
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1244
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1256
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1257 1258 1259 1260 1261
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1262
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1279 1280
  int32_t       code = 0;

D
dapan1121 已提交
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1299
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1346 1347
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1348
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1349
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1350
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1351 1352
  }

dengyihao's avatar
dengyihao 已提交
1353 1354
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1355
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1356
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1357
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1358 1359 1360 1361 1362 1363 1364 1365
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1366

D
dapan1121 已提交
1367
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1368
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1369 1370
  }

D
dapan1121 已提交
1371
  CTG_CACHE_STAT_INC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1372

D
dapan1121 已提交
1373
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
D
dapan1121 已提交
1374
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1375

dengyihao's avatar
dengyihao 已提交
1376
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1377

D
dapan1121 已提交
1378 1379
  if (!IS_SYS_DBNAME(dbFName)) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1380

D
dapan1121 已提交
1381 1382
    ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
  }
D
dapan1121 已提交
1383 1384 1385 1386 1387 1388 1389 1390 1391 1392

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1393
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1394 1395 1396
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1397

D
dapan1121 已提交
1398 1399 1400 1401
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1402

dengyihao's avatar
dengyihao 已提交
1403 1404 1405
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1406
    }
dengyihao's avatar
dengyihao 已提交
1407

D
dapan1121 已提交
1408
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1409 1410 1411
  }
}

dengyihao's avatar
dengyihao 已提交
1412
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1413
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1414 1415

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1416

D
dapan1121 已提交
1417
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1418

D
dapan1121 已提交
1419
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1420
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1421 1422
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1423 1424 1425
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1426
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1427 1428 1429 1430 1431 1432

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1433
  CTG_CACHE_STAT_DEC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1434 1435
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1436 1437 1438
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1439 1440
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1441 1442
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1443

D
dapan1121 已提交
1444
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1445
    // TODO OPEN IT
D
dapan1121 已提交
1446 1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1462

D
dapan1121 已提交
1463 1464 1465 1466 1467 1468 1469
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1470

D
dapan1121 已提交
1471 1472 1473 1474 1475 1476 1477 1478 1479
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1480 1481
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1482 1483 1484 1485 1486 1487 1488 1489 1490
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1491

D
dapan1121 已提交
1492 1493
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1494

dengyihao's avatar
dengyihao 已提交
1495 1496
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1497

dengyihao's avatar
dengyihao 已提交
1498 1499
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1500

dengyihao's avatar
dengyihao 已提交
1501 1502
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1503

dengyihao's avatar
dengyihao 已提交
1504 1505
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1506 1507
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1508
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1509 1510 1511
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1512 1513 1514 1515 1516
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1517 1518 1519
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1520 1521
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1522 1523
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1524 1525
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1526

D
dapan1121 已提交
1527
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1528
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1529
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1530
      } else {
D
dapan1121 已提交
1531
        CTG_CACHE_STAT_DEC(numOfStb, 1);
dengyihao's avatar
dengyihao 已提交
1532
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1533 1534 1535 1536
      }
    }
  }

D
dapan1121 已提交
1537 1538 1539 1540 1541
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
1542
      taosMemoryFree(meta);
D
dapan1121 已提交
1543
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1544
    }
dengyihao's avatar
dengyihao 已提交
1545

D
dapan1121 已提交
1546 1547
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1548
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1549 1550
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1551
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1552 1553 1554
  }

  if (NULL == orig) {
D
dapan1121 已提交
1555
    CTG_CACHE_STAT_INC(numOfTbl, 1);
D
dapan1121 已提交
1556 1557 1558 1559 1560 1561 1562 1563 1564
  }

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1565
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1566
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1567
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1568 1569
  }

D
dapan1121 已提交
1570
  CTG_CACHE_STAT_INC(numOfStb, 1);
D
dapan1121 已提交
1571

dengyihao's avatar
dengyihao 已提交
1572 1573
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1574

D
dapan1121 已提交
1575 1576 1577
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1578

D
dapan1121 已提交
1579 1580 1581
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1582
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1583
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1584
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1585
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1586
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1587 1588 1589
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1590 1591 1592
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1593 1594 1595
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1596

D
dapan1121 已提交
1597 1598
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1599 1600
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1601
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1602 1603 1604
    }

    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1605 1606
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1607

D
dapan1121 已提交
1608
    if (suid) {
D
dapan1121 已提交
1609
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1610
    }
dengyihao's avatar
dengyihao 已提交
1611

D
dapan1121 已提交
1612 1613 1614
    return TSDB_CODE_SUCCESS;
  }

1615 1616
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1617
  if (pCache->pIndex) {
D
dapan1121 已提交
1618 1619 1620
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1621 1622 1623 1624 1625
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1626 1627
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1628 1629
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1630 1631
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1632

D
dapan1121 已提交
1633 1634 1635
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1636

D
dapan1121 已提交
1637 1638 1639
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1640 1641 1642 1643
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1644
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1645 1646 1647
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1648 1649

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1650

D
dapan1121 已提交
1651 1652 1653 1654 1655 1656
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1657
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1658
  SCatalog *pCtg = NULL;
1659

dengyihao's avatar
dengyihao 已提交
1660
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1661
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1662
    pCtg = *(SCatalog **)pIter;
1663 1664

    if (pCtg) {
D
dapan1121 已提交
1665 1666 1667 1668 1669 1670 1671 1672
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1673
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1674

dengyihao's avatar
dengyihao 已提交
1675
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1676
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1677
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1678 1679 1680

    if (pCtg) {
      ctgFreeHandle(pCtg);
1681 1682 1683 1684 1685 1686 1687 1688
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

X
Xiaoyu Wang 已提交
1689 1690 1691
int32_t ctgVgInfoIdComp(void const *lp, void const *rp) {
  int32_t     *key = (int32_t *)lp;
  SVgroupInfo *pVg = (SVgroupInfo *)rp;
1692 1693 1694 1695 1696 1697 1698 1699 1700 1701

  if (*key < pVg->vgId) {
    return -1;
  } else if (*key > pVg->vgId) {
    return 1;
  }

  return 0;
}

D
dapan1121 已提交
1702
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1703
  int32_t          code = 0;
D
dapan1121 已提交
1704
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1705 1706 1707 1708
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1709
  if (pCtg->stopUpdate || NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1710
    goto _return;
D
dapan1121 已提交
1711
  }
dengyihao's avatar
dengyihao 已提交
1712

D
dapan1121 已提交
1713
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1714
    ctgDebug("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
dengyihao's avatar
dengyihao 已提交
1715
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1716
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1717 1718
  }

dengyihao's avatar
dengyihao 已提交
1719
  bool         newAdded = false;
dengyihao's avatar
dengyihao 已提交
1720 1721
  SDbVgVersion vgVersion = {
      .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
D
dapan1121 已提交
1722 1723

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1724
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1725
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1726
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1727
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1728 1729 1730
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1731
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1732

D
dapan1121 已提交
1733 1734
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1735

D
dapan1121 已提交
1736
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
dengyihao's avatar
dengyihao 已提交
1737 1738
      ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion,
               vgInfo->vgVersion);
D
dapan1121 已提交
1739
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1740

D
dapan1121 已提交
1741
      goto _return;
D
dapan1121 已提交
1742 1743
    }

dengyihao's avatar
dengyihao 已提交
1744 1745 1746 1747
    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable &&
        dbInfo->stateTs == vgInfo->stateTs) {
      ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName,
               dbInfo->vgVersion, dbInfo->numOfTable, dbInfo->stateTs);
D
dapan1121 已提交
1748
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1749

D
dapan1121 已提交
1750
      goto _return;
D
dapan1121 已提交
1751 1752
    }

1753
    freeVgInfo(vgInfo);
D
dapan1121 已提交
1754 1755 1756 1757 1758
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;

dengyihao's avatar
dengyihao 已提交
1759 1760
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
           vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
D
dapan1121 已提交
1761 1762 1763 1764 1765

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

X
Xiaoyu Wang 已提交
1766 1767 1768 1769
  // if (!IS_SYS_DBNAME(dbFName)) {
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                 ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
1770
  //}
D
dapan1121 已提交
1771 1772 1773

_return:

1774
  freeVgInfo(msg->dbInfo);
D
dapan1121 已提交
1775
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1776

D
dapan1121 已提交
1777 1778 1779
  CTG_RET(code);
}

D
dapan1121 已提交
1780
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1781
  int32_t        code = 0;
D
dapan1121 已提交
1782
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1783
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1784

D
dapan1121 已提交
1785 1786 1787 1788
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1789 1790 1791 1792 1793
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1794

1795
  if (msg->dbId && dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1796 1797
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1798 1799
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1800

D
dapan1121 已提交
1801 1802 1803 1804 1805
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1806

D
dapan1121 已提交
1807 1808 1809
  CTG_RET(code);
}

D
dapan1121 已提交
1810
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1811
  int32_t              code = 0;
D
dapan1121 已提交
1812
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1813
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1814

D
dapan1121 已提交
1815 1816 1817 1818
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1819 1820 1821 1822 1823
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1824

dengyihao's avatar
dengyihao 已提交
1825
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1826

1827
  freeVgInfo(dbCache->vgCache.vgInfo);
D
dapan1121 已提交
1828
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1829 1830 1831

  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1832
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1833 1834 1835 1836

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1837

D
dapan1121 已提交
1838 1839 1840
  CTG_RET(code);
}

D
dapan1121 已提交
1841
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1842
  int32_t              code = 0;
D
dapan1121 已提交
1843
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1844 1845 1846
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1847

D
dapan1121 已提交
1848 1849 1850
  if (pCtg->stopUpdate) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1851

D
dapan1121 已提交
1852 1853
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1854 1855 1856
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1857 1858
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1859
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1860 1861
  }

D
dapan1121 已提交
1862
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1863
  if (NULL == dbCache) {
D
dapan1121 已提交
1864
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1865 1866 1867
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1868 1869
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1870
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1871
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1872
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1873 1874
  }

D
dapan1121 已提交
1875
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1876
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1877 1878 1879 1880
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1881 1882
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1883 1884 1885 1886
  }

_return:

D
dapan1121 已提交
1887 1888
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1889

D
dapan1121 已提交
1890
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1891

D
dapan1121 已提交
1892 1893 1894
  CTG_RET(code);
}

D
dapan1121 已提交
1895
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1896
  int32_t             code = 0;
D
dapan1121 已提交
1897
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1898
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1899

D
dapan1121 已提交
1900 1901 1902 1903
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1904 1905 1906
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1907
    goto _return;
D
dapan1121 已提交
1908 1909 1910
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1911
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1912
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1913
    goto _return;
D
dapan1121 已提交
1914
  }
dengyihao's avatar
dengyihao 已提交
1915

D
dapan1121 已提交
1916
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1917 1918
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1919
  } else {
D
dapan1121 已提交
1920
    CTG_CACHE_STAT_DEC(numOfStb, 1);
D
dapan1121 已提交
1921 1922
  }

dengyihao's avatar
dengyihao 已提交
1923
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1924 1925 1926 1927 1928 1929 1930 1931 1932
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1933 1934
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1935
  } else {
D
dapan1121 已提交
1936
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1937
  }
dengyihao's avatar
dengyihao 已提交
1938 1939

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1940 1941

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1942 1943 1944

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1945 1946 1947
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1948

D
dapan1121 已提交
1949 1950 1951
  CTG_RET(code);
}

D
dapan1121 已提交
1952
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1953
  int32_t             code = 0;
D
dapan1121 已提交
1954
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1955
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1956

D
dapan1121 已提交
1957 1958 1959 1960
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1961 1962 1963
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1964
    goto _return;
D
dapan1121 已提交
1965 1966 1967
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1968 1969
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
1970
    goto _return;
D
dapan1121 已提交
1971 1972
  }

dengyihao's avatar
dengyihao 已提交
1973
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
1974 1975 1976 1977 1978 1979 1980 1981
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
1982

D
dapan1121 已提交
1983 1984 1985
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1986
  } else {
D
dapan1121 已提交
1987
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1988 1989
  }

D
dapan1121 已提交
1990
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
1991 1992 1993 1994 1995 1996 1997 1998

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
1999
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2000
  int32_t            code = 0;
D
dapan1121 已提交
2001
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2002 2003
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
2004 2005 2006 2007
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2008 2009 2010 2011
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

D
dapan1121 已提交
2012
    memcpy(&userAuth.userAuth, &msg->userAuth, sizeof(msg->userAuth));
D
dapan1121 已提交
2013 2014 2015 2016 2017 2018 2019 2020 2021 2022 2023 2024 2025

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_WRITE, &pUser->lock);

D
dapan1121 已提交
2026 2027
  taosHashCleanup(pUser->userAuth.createdDbs);
  pUser->userAuth.createdDbs = msg->userAuth.createdDbs;
D
dapan1121 已提交
2028 2029
  msg->userAuth.createdDbs = NULL;

D
dapan1121 已提交
2030 2031
  taosHashCleanup(pUser->userAuth.readDbs);
  pUser->userAuth.readDbs = msg->userAuth.readDbs;
D
dapan1121 已提交
2032 2033
  msg->userAuth.readDbs = NULL;

D
dapan1121 已提交
2034 2035
  taosHashCleanup(pUser->userAuth.writeDbs);
  pUser->userAuth.writeDbs = msg->userAuth.writeDbs;
D
dapan1121 已提交
2036 2037
  msg->userAuth.writeDbs = NULL;

X
Xiaoyu Wang 已提交
2038 2039 2040 2041 2042 2043 2044 2045
  taosHashCleanup(pUser->userAuth.readTbs);
  pUser->userAuth.readTbs = msg->userAuth.readTbs;
  msg->userAuth.readTbs = NULL;

  taosHashCleanup(pUser->userAuth.writeTbs);
  pUser->userAuth.writeTbs = msg->userAuth.writeTbs;
  msg->userAuth.writeTbs = NULL;

X
Xiaoyu Wang 已提交
2046 2047 2048 2049
  taosHashCleanup(pUser->userAuth.useDbs);
  pUser->userAuth.useDbs = msg->userAuth.useDbs;
  msg->userAuth.useDbs = NULL;

D
dapan1121 已提交
2050 2051 2052 2053 2054 2055 2056
  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2057 2058
  taosHashCleanup(msg->userAuth.readTbs);
  taosHashCleanup(msg->userAuth.writeTbs);
X
Xiaoyu Wang 已提交
2059
  taosHashCleanup(msg->userAuth.useDbs);
dengyihao's avatar
dengyihao 已提交
2060

D
dapan1121 已提交
2061
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2062

D
dapan1121 已提交
2063 2064 2065
  CTG_RET(code);
}

D
dapan1121 已提交
2066
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2067
  int32_t             code = 0;
D
dapan1121 已提交
2068
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2069
  SCatalog           *pCtg = msg->pCtg;
dengyihao's avatar
dengyihao 已提交
2070
  SCtgDBCache        *dbCache = NULL;
D
dapan1121 已提交
2071 2072 2073 2074 2075

  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2076
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2077 2078 2079 2080 2081
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
2082 2083
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
2084
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
2085
  if (NULL == vgInfo) {
D
dapan1121 已提交
2086 2087 2088
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
2089 2090

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
2091
  if (NULL == pInfo) {
2092 2093 2094 2095 2096 2097 2098
    ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

  SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
  if (NULL == pInfo2) {
    ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
D
dapan1121 已提交
2099 2100 2101
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
2102 2103 2104 2105 2106
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
2107

D
dapan1121 已提交
2108
  pInfo->epSet = msg->epSet;
2109
  pInfo2->epSet = msg->epSet;
D
dapan1121 已提交
2110 2111 2112

_return:

D
dapan1121 已提交
2113
  if (code == TSDB_CODE_SUCCESS && dbCache) {
D
dapan1121 已提交
2114
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
2115 2116 2117
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2118

D
dapan1121 已提交
2119 2120 2121
  CTG_RET(code);
}

D
dapan1121 已提交
2122
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2123
  int32_t               code = 0;
D
dapan1121 已提交
2124
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2125 2126 2127 2128
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
2129 2130 2131 2132
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2133
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
2134 2135 2136 2137 2138 2139 2140 2141 2142

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2143

D
dapan1121 已提交
2144
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2145

D
dapan1121 已提交
2146 2147 2148 2149
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2150
  int32_t             code = 0;
D
dapan1121 已提交
2151
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2152 2153 2154
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
2155 2156 2157 2158
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2159
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2160
  if (NULL == dbCache) {
D
dapan1121 已提交
2161 2162 2163
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
2164
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
2165 2166
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
2167
  }
D
dapan1121 已提交
2168 2169 2170
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
2171

D
dapan1121 已提交
2172
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
2173 2174 2175 2176 2177 2178 2179

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2180

D
dapan1121 已提交
2181
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2182

D
dapan1121 已提交
2183 2184 2185
  CTG_RET(code);
}

D
dapan1121 已提交
2186
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2187
  int32_t            code = 0;
D
dapan1121 已提交
2188
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2189
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
2190

D
dapan1121 已提交
2191 2192
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
2193
  if (pCtg) {
D
dapan1121 已提交
2194 2195 2196 2197 2198
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
2199

D
dapan1121 已提交
2200 2201
    goto _return;
  }
D
dapan1121 已提交
2202 2203 2204 2205 2206 2207

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
2208 2209

_return:
D
dapan1121 已提交
2210 2211

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
2212

D
dapan1121 已提交
2213
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2214

D
dapan1121 已提交
2215 2216 2217
  CTG_RET(code);
}

2218 2219 2220 2221 2222 2223 2224 2225
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
  if (NULL == op || NULL == op->data) {
    return;
  }

  switch (op->opId) {
    case CTG_OP_UPDATE_VGROUP: {
      SCtgUpdateVgMsg *msg = op->data;
2226
      freeVgInfo(msg->dbInfo);
2227 2228 2229 2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241 2242 2243 2244 2245 2246 2247 2248 2249 2250 2251
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_META: {
      SCtgUpdateTbMetaMsg *msg = op->data;
      taosMemoryFreeClear(msg->pMeta->tbMeta);
      taosMemoryFreeClear(msg->pMeta);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_DROP_DB_CACHE:
    case CTG_OP_DROP_DB_VGROUP:
    case CTG_OP_DROP_STB_META:
    case CTG_OP_DROP_TB_META:
    case CTG_OP_UPDATE_VG_EPSET:
    case CTG_OP_DROP_TB_INDEX:
    case CTG_OP_CLEAR_CACHE: {
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_USER: {
      SCtgUpdateUserMsg *msg = op->data;
      taosHashCleanup(msg->userAuth.createdDbs);
      taosHashCleanup(msg->userAuth.readDbs);
      taosHashCleanup(msg->userAuth.writeDbs);
X
Xiaoyu Wang 已提交
2252 2253
      taosHashCleanup(msg->userAuth.readTbs);
      taosHashCleanup(msg->userAuth.writeTbs);
X
Xiaoyu Wang 已提交
2254
      taosHashCleanup(msg->userAuth.useDbs);
2255 2256 2257 2258 2259 2260 2261 2262 2263 2264 2265 2266 2267 2268 2269 2270 2271 2272 2273
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_INDEX: {
      SCtgUpdateTbIndexMsg *msg = op->data;
      if (msg->pIndex) {
        taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
        taosMemoryFreeClear(msg->pIndex);
      }
      taosMemoryFreeClear(op->data);
      break;
    }
    default: {
      qError("invalid cache op id:%d", op->opId);
      break;
    }
  }
}

D
dapan1121 已提交
2274
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2275 2276
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2277
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2278
  bool                stopQueue = false;
D
dapan1121 已提交
2279 2280 2281 2282 2283

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2284 2285 2286 2287 2288 2289
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
dengyihao's avatar
dengyihao 已提交
2290
          CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2291
        } else {
2292
          ctgFreeCacheOperationData(op);
dengyihao's avatar
dengyihao 已提交
2293
          CTG_RT_STAT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2294
        }
dengyihao's avatar
dengyihao 已提交
2295

D
dapan1121 已提交
2296 2297
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2298
        } else {
D
dapan1121 已提交
2299
          taosMemoryFree(op);
D
dapan1121 已提交
2300
        }
D
dapan1121 已提交
2301
      }
D
dapan1121 已提交
2302 2303 2304

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2305

D
dapan1121 已提交
2306
      node = nodeNext;
D
dapan1121 已提交
2307 2308
    }

D
dapan1121 已提交
2309
    if (!stopQueue) {
D
dapan1121 已提交
2310 2311 2312 2313
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2314 2315 2316 2317 2318 2319
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2320
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2321
  setThreadName("catalog");
2322

D
dapan1121 已提交
2323 2324 2325 2326 2327 2328
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2329

2330
    if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
D
dapan1121 已提交
2331
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2332 2333 2334
      break;
    }

D
dapan1121 已提交
2335 2336 2337
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2338

D
dapan1121 已提交
2339
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2340

D
dapan1121 已提交
2341
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2342

D
dapan1121 已提交
2343
    if (operation->syncOp) {
D
dapan1121 已提交
2344
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2345 2346
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2347 2348
    }

dengyihao's avatar
dengyihao 已提交
2349
    CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2350

D
dapan1121 已提交
2351
    ctgdShowCacheInfo();
D
dapan1121 已提交
2352 2353 2354
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2355

D
dapan1121 已提交
2356 2357 2358 2359 2360 2361 2362 2363 2364 2365 2366 2367
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2368

D
dapan1121 已提交
2369 2370 2371 2372
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2373
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2374
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2375 2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
2387
    *pTableMeta = NULL;
D
dapan1121 已提交
2388 2389 2390 2391 2392 2393 2394 2395 2396
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2397
#if 0
2398
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2399 2400 2401 2402 2403 2404 2405 2406 2407 2408 2409 2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2436
      fetch.tbIdx = i;
D
dapan1121 已提交
2437 2438 2439 2440 2441 2442
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2443
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2444 2445 2446
  }

  if (NULL == ctx->pFetchs) {
2447
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2448 2449 2450 2451
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2452 2453
#endif

dengyihao's avatar
dengyihao 已提交
2454 2455 2456 2457 2458 2459 2460 2461 2462
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2463 2464 2465 2466 2467 2468 2469 2470
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2471
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2472
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2473

D
dapan1121 已提交
2474 2475 2476
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2477
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2478
      taosArrayPush(ctx->pResList, &(SMetaData){0});
D
dapan1121 已提交
2479 2480 2481 2482 2483 2484
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
H
Haojun Liao 已提交
2485
    pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2486 2487 2488 2489

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2490
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2491
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2492

D
dapan1121 已提交
2493 2494 2495 2496 2497
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
K
kailixu 已提交
2498
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
D
dapan1121 已提交
2499
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2500
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2501
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
dengyihao's avatar
dengyihao 已提交
2502

D
dapan1121 已提交
2503 2504 2505
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2506
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2507 2508 2509 2510 2511 2512 2513 2514

    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2515 2516
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2517 2518 2519 2520
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2521
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2522 2523
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2524

D
dapan1121 已提交
2525
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2526

2527
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2528 2529
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2530
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2531

D
dapan1121 已提交
2532
      res.pRes = pTableMeta;
2533
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2534 2535 2536

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2537

D
dapan1121 已提交
2538 2539 2540 2541
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2542
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2543

2544
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2545 2546
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2547
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2548

D
dapan1121 已提交
2549
      res.pRes = pTableMeta;
2550
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2551

D
dapan1121 已提交
2552 2553
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2554

D
dapan1121 已提交
2555 2556 2557
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2558
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2559 2560
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2561

D
dapan1121 已提交
2562
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2563

2564
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2565 2566 2567 2568 2569 2570
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2571 2572
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2573
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2574
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2575 2576 2577 2578 2579 2580 2581 2582 2583

      taosMemoryFreeClear(pTableMeta);
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2584

2585
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2586
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2587

dengyihao's avatar
dengyihao 已提交
2588
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2589 2590 2591 2592 2593 2594 2595 2596
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2597
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2598 2599
      taosHashRelease(dbCache->tbCache, pCache);

2600
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2601
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2602 2603 2604 2605 2606

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2607 2608 2609

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2610
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2611 2612 2613 2614 2615
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2616
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
H
Haojun Liao 已提交
2617
      taosArrayPush(ctx->pResList, &(SMetaRes){0});
D
dapan1121 已提交
2618 2619 2620 2621 2622

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2623

D
dapan1121 已提交
2624 2625
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2626
    if (NULL == pTableMeta) {
2627
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2628 2629
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2630

D
dapan1121 已提交
2631
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2632

2633
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2634 2635
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2636
    res.pRes = pTableMeta;
2637
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2638 2639 2640 2641 2642

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2643
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2644

D
dapan1121 已提交
2645 2646 2647
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2648
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2649
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2650
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2651 2652 2653 2654 2655 2656 2657 2658 2659 2660 2661 2662 2663 2664 2665 2666 2667 2668 2669 2670 2671 2672 2673 2674 2675 2676 2677 2678
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2679
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2680 2681 2682 2683
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2684
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2685 2686 2687 2688 2689 2690 2691 2692 2693 2694 2695 2696 2697 2698 2699 2700 2701 2702 2703 2704 2705 2706 2707 2708 2709 2710
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}