ctgCache.c 68.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34 35
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
36

D
dapan1121 已提交
37
  if (dbCache->deleted) {
D
dapan1121 已提交
38
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
39

dengyihao's avatar
dengyihao 已提交
40 41
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
42 43 44 45
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
46 47
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
48 49

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
50
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
51 52 53 54
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57 58
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
59 60
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
61 62

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
63
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
64
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
65 66 67 68 69 70
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
71
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
72

dengyihao's avatar
dengyihao 已提交
73
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
74

D
dapan1121 已提交
75 76 77 78
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) { 
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock); 
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
79

dengyihao's avatar
dengyihao 已提交
80
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
81
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
82
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
83 84 85
    dbFName = p + 1;
  }

D
dapan1121 已提交
86 87 88 89 90 91 92 93
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
  
D
dapan1121 已提交
94 95 96 97 98 99
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
100 101 102 103
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
104 105 106
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
107 108
    }

D
dapan1121 已提交
109 110 111 112 113 114
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
dengyihao's avatar
dengyihao 已提交
115

D
dapan1121 已提交
116 117 118
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
119
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
120 121 122
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
123
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
124 125 126
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
127
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
128 129 130
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
131

dengyihao's avatar
dengyihao 已提交
132
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
133 134
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
135
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
136
  }
D
dapan1121 已提交
137

D
dapan1121 已提交
138 139
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
140
  }
D
dapan1121 已提交
141
}
D
dapan1121 已提交
142

dengyihao's avatar
dengyihao 已提交
143
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
144 145
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
146
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
147 148 149 150 151 152 153
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

dengyihao's avatar
dengyihao 已提交
154
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
155
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
156
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
157
  if (NULL == dbCache) {
D
dapan1121 已提交
158 159 160 161 162
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
163
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
164 165 166 167 168 169 170
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
171
  CTG_CACHE_STAT_INC(numOfVgHit, 1);
D
dapan1121 已提交
172 173

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
174

D
dapan1121 已提交
175 176 177 178 179 180 181 182 183 184
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
185
  CTG_CACHE_STAT_INC(numOfVgMiss, 1);
dengyihao's avatar
dengyihao 已提交
186

D
dapan1121 已提交
187 188 189
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
190
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
191
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
192
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
193 194 195 196 197
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
198

D
dapan1121 已提交
199
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
200 201 202
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
203 204
  }

D
dapan1121 已提交
205 206 207 208 209 210 211 212 213 214
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
215

D
dapan1121 已提交
216
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
217 218 219 220 221 222 223

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
224
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
dengyihao's avatar
dengyihao 已提交
225

D
dapan1121 已提交
226 227 228
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
229 230 231
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
232 233 234 235 236
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
237 238

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
239
  if (NULL == stName) {
D
dapan1121 已提交
240
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
241 242 243 244 245
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
246
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
247 248 249 250 251 252
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
253
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
254 255 256 257 258 259
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
260
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
261

D
dapan1121 已提交
262
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
263 264 265 266 267 268 269

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
270
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
D
dapan1121 已提交
271 272 273

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
274

D
dapan1121 已提交
275 276 277
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
278
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
279
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
280
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
281 282
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
283 284 285
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
286

D
dapan1121 已提交
287
  int32_t sz = 0;
D
dapan1121 已提交
288
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
289 290 291 292 293 294 295 296 297
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
298 299
  }

D
dapan1121 已提交
300 301 302 303
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
304

D
dapan1121 已提交
305
  CTG_CACHE_STAT_INC(numOfIndexHit, 1);
D
dapan1121 已提交
306 307 308 309 310 311 312

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
313
  CTG_CACHE_STAT_INC(numOfIndexMiss, 1);
dengyihao's avatar
dengyihao 已提交
314

D
dapan1121 已提交
315 316 317
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
318
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
319
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
320
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
321 322 323
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
324

D
dapan1121 已提交
325 326 327 328 329
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
330
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
331

D
dapan1121 已提交
332 333 334
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
335 336
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
D
dapan1121 已提交
337
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
338
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
339 340 341 342 343 344 345 346 347
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

D
dapan1121 已提交
348 349 350
  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
351 352 353
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
354
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
355 356 357 358
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
359

D
dapan1121 已提交
360
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
361 362 363 364 365 366 367 368
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
369

D
dapan1121 已提交
370 371
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
372 373
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
374 375

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
376

D
dapan1121 已提交
377 378 379 380
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
381 382
  }

D
dapan1121 已提交
383
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
384

D
dapan1121 已提交
385
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
386 387
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
388 389 390 391 392

  ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
393
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
D
dapan1121 已提交
394 395
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
396 397 398

  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
D
dapan1121 已提交
399
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
400
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
D
dapan1121 已提交
401 402 403
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
404
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
405
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
406
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
407
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
408
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
409 410
  }

D
dapan1121 已提交
411
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
412

D
dapan1121 已提交
413
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
414

D
dapan1121 已提交
415
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
416

D
dapan1121 已提交
417 418 419 420
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
421
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
422
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
423

D
dapan1121 已提交
424 425 426
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
427 428
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
429
  *sver = -1;
D
dapan1121 已提交
430
  *tver = -1;
D
dapan1121 已提交
431 432

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
433
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
434
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
435 436
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
437 438 439
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
440 441 442
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
443
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
444 445
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
446

D
dapan1121 已提交
447
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
448 449 450
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
451 452
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
453

D
dapan1121 已提交
454
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
455 456 457
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
458
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
459

D
dapan1121 已提交
460 461
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
462

D
dapan1121 已提交
463 464
  ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache);
  if (NULL == tbCache) {
D
dapan1121 已提交
465
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
466
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
467 468
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
469 470

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
471
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
472
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
473
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
474 475 476
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
477
  size_t nameLen = 0;
D
dapan1121 已提交
478
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
479 480 481 482

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
483 484
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
485

D
dapan1121 已提交
486
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
487

dengyihao's avatar
dengyihao 已提交
488 489
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
490 491 492 493

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
494
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
495
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
496
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
497
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
498 499
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
500 501
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
502 503 504 505

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
506 507
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
508 509 510
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
511 512
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
513
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
514
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
515 516
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
517

D
dapan1121 已提交
518
  *pRes = NULL;
D
dapan1121 已提交
519

D
dapan1121 已提交
520 521 522
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
523 524 525
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
526
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
527

D
dapan1121 已提交
528
_return:
D
dapan1121 已提交
529

D
dapan1121 已提交
530
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
531

D
dapan1121 已提交
532
  CTG_RET(code);
D
dapan1121 已提交
533 534
}

dengyihao's avatar
dengyihao 已提交
535
int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
536 537 538 539 540 541
  char *p = strchr(dbFName, '.');
  if (p) {
    ++p;
  } else {
    p = dbFName;
  }
dengyihao's avatar
dengyihao 已提交
542

543 544 545 546 547 548 549
  if (IS_SYS_DBNAME(p)) {
    *inCache = true;
    *pass = true;
    ctgDebug("sysdb %s, pass", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
550 551 552 553 554 555 556 557 558
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
  if (NULL == pUser) {
    ctgDebug("user not in cache, user:%s", user);
    goto _return;
  }

  *inCache = true;

  ctgDebug("Got user from cache, user:%s", user);
D
dapan1121 已提交
559
  CTG_CACHE_STAT_INC(numOfUserHit, 1);
dengyihao's avatar
dengyihao 已提交
560

D
dapan1121 已提交
561 562 563 564 565 566 567 568 569 570 571
  if (pUser->superUser) {
    *pass = true;
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_READ, &pUser->lock);
  if (pUser->createdDbs && taosHashGet(pUser->createdDbs, dbFName, strlen(dbFName))) {
    *pass = true;
    CTG_UNLOCK(CTG_READ, &pUser->lock);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
572

D
dapan1121 已提交
573 574 575
  if (pUser->readDbs && taosHashGet(pUser->readDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_READ) {
    *pass = true;
  }
dengyihao's avatar
dengyihao 已提交
576

D
dapan1121 已提交
577 578 579 580 581
  if (pUser->writeDbs && taosHashGet(pUser->writeDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_WRITE) {
    *pass = true;
  }

  CTG_UNLOCK(CTG_READ, &pUser->lock);
dengyihao's avatar
dengyihao 已提交
582

D
dapan1121 已提交
583 584 585 586 587
  return TSDB_CODE_SUCCESS;

_return:

  *inCache = false;
D
dapan1121 已提交
588
  CTG_CACHE_STAT_INC(numOfUserMiss, 1);
dengyihao's avatar
dengyihao 已提交
589

D
dapan1121 已提交
590 591 592
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
593
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
594
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
595

D
dapan1121 已提交
596 597 598
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
599
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
600

D
dapan1121 已提交
601 602
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
603
  *op = node->op;
D
dapan1121 已提交
604 605
}

dengyihao's avatar
dengyihao 已提交
606
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
607 608 609
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
610 611
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
612
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
613 614
  }

dengyihao's avatar
dengyihao 已提交
615 616
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
617 618 619
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
620

D
dapan1121 已提交
621
  node->op = operation;
D
dapan1121 已提交
622 623

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
D
dapan1121 已提交
624
  if (gCtgMgmt.queue.stopQueue) {
625 626 627 628
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
D
dapan1121 已提交
629
  gCtgMgmt.queue.stopQueue = operation->stopQueue;
D
dapan1121 已提交
630 631 632 633
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
634 635
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
636
  CTG_QUEUE_INC();
D
dapan1121 已提交
637
  CTG_RT_STAT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
638 639 640

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
641
  if (syncOp) {
642 643 644
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
645
    tsem_wait(&operation->rspSem);
646 647 648
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
649
    taosMemoryFree(operation);
D
dapan1121 已提交
650 651 652 653 654
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
655 656
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
657 658
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
dengyihao's avatar
dengyihao 已提交
659

D
dapan1121 已提交
660
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
661
  if (NULL == msg) {
D
dapan1121 已提交
662
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
663
    taosMemoryFree(op);
D
dapan1121 已提交
664
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
665 666 667
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
668
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
669 670 671 672
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
673
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
674 675
  msg->dbId = dbId;

D
dapan1121 已提交
676
  op->data = msg;
D
dapan1121 已提交
677

D
dapan1121 已提交
678
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
679 680 681 682 683 684 685 686

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
687 688
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
689 690 691
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
692

D
dapan1121 已提交
693 694 695
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
696
    taosMemoryFree(op);
D
dapan1121 已提交
697
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
698 699 700
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
701
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
702 703 704 705
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
706
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
707

D
dapan1121 已提交
708
  op->data = msg;
D
dapan1121 已提交
709

D
dapan1121 已提交
710
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
711 712 713 714 715 716 717 718

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
719 720 721
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
722 723 724
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
725

D
dapan1121 已提交
726
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
727
  if (NULL == msg) {
D
dapan1121 已提交
728
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
729
    taosMemoryFree(op);
D
dapan1121 已提交
730
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
731 732 733
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
734 735
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
736 737 738
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
739
  op->data = msg;
D
dapan1121 已提交
740

D
dapan1121 已提交
741
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
742 743 744 745 746 747 748 749

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
750 751
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
752 753 754
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
755

D
dapan1121 已提交
756
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
757
  if (NULL == msg) {
D
dapan1121 已提交
758
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
759
    taosMemoryFree(op);
D
dapan1121 已提交
760
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
761 762 763
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
764 765
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
766 767
  msg->dbId = dbId;

D
dapan1121 已提交
768
  op->data = msg;
D
dapan1121 已提交
769

D
dapan1121 已提交
770
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
771 772 773 774 775 776 777 778

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
779 780
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
781 782 783
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
784

D
dapan1121 已提交
785 786 787
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
788
    taosMemoryFree(op);
D
dapan1121 已提交
789
    ctgFreeVgInfo(dbInfo);
D
dapan1121 已提交
790
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
791 792 793
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
794
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
795 796 797
    dbFName = p + 1;
  }

D
dapan1121 已提交
798
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
799 800 801 802
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
803
  op->data = msg;
D
dapan1121 已提交
804

D
dapan1121 已提交
805
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
806 807 808 809 810 811 812 813 814

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeVgInfo(dbInfo);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
815 816
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
817 818 819
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
820

D
dapan1121 已提交
821
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
822
  if (NULL == msg) {
D
dapan1121 已提交
823
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
824
    taosMemoryFree(op);
D
dapan1121 已提交
825
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
826 827 828
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
829
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
830 831
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
832 833 834
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
835
  msg->pMeta = output;
D
dapan1121 已提交
836

D
dapan1121 已提交
837
  op->data = msg;
D
dapan1121 已提交
838

D
dapan1121 已提交
839
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
840 841

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
842

D
dapan1121 已提交
843 844
_return:

D
dapan1121 已提交
845 846 847 848 849
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
850 851 852
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
853 854
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
855 856
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
857

D
dapan1121 已提交
858 859 860
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
861
    taosMemoryFree(op);
D
dapan1121 已提交
862
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
863 864 865
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
866
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
867 868 869
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
870
  op->data = msg;
D
dapan1121 已提交
871

D
dapan1121 已提交
872
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
873 874

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
875

D
dapan1121 已提交
876 877 878 879 880
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
881 882
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
883 884 885
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
886

D
dapan1121 已提交
887 888 889
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
890
    taosMemoryFree(op);
D
dapan1121 已提交
891
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
892 893 894 895 896
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
897
  op->data = msg;
D
dapan1121 已提交
898

D
dapan1121 已提交
899
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
900

D
dapan1121 已提交
901
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
902

D
dapan1121 已提交
903 904 905
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
906

D
dapan1121 已提交
907 908 909
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
910 911
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
912 913 914
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
915

D
dapan1121 已提交
916 917 918
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
919
    taosMemoryFree(op);
D
dapan1121 已提交
920
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
921 922 923
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
924
  msg->pIndex = *pIndex;
D
dapan1121 已提交
925 926 927 928

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
929 930

  *pIndex = NULL;
D
dapan1121 已提交
931
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
932

D
dapan1121 已提交
933 934
_return:

D
dapan1121 已提交
935
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
936
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
937

D
dapan1121 已提交
938 939 940
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
941 942
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
943 944 945
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
946

D
dapan1121 已提交
947 948 949
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
950
    taosMemoryFree(op);
D
dapan1121 已提交
951
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
952 953 954 955 956 957 958 959 960
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
961

D
dapan1121 已提交
962
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
963

D
dapan1121 已提交
964 965 966 967 968
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
969 970
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
971 972 973
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
974
  op->stopQueue = stopQueue;
975
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
976

D
dapan1121 已提交
977 978 979
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
980
    taosMemoryFree(op);
D
dapan1121 已提交
981
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
982 983 984
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
985
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
986 987 988
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
989

D
dapan1121 已提交
990
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
991

D
dapan1121 已提交
992 993 994 995 996
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
997 998 999 1000 1001 1002
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1003

D
dapan1121 已提交
1004 1005 1006
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1007
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1008 1009 1010
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1011

D
dapan1121 已提交
1012 1013 1014 1015 1016 1017 1018
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1019 1020
  int32_t       code = 0;

D
dapan1121 已提交
1021 1022 1023 1024
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1025 1026
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1027
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1028 1029 1030 1031
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1032
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1033
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1034 1035 1036 1037
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1038
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1039 1040 1041 1042 1043 1044 1045

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1046 1047
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1048 1049 1050
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1051
  int32_t       code = 0;
D
dapan1121 已提交
1052 1053 1054

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1055
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1056 1057 1058 1059
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1060 1061
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1062 1063 1064 1065 1066 1067 1068
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1069 1070
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1071 1072 1073 1074 1075
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1076
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1077 1078 1079 1080 1081 1082

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1083 1084
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1095 1096
  int32_t       code = 0;

D
dapan1121 已提交
1097 1098
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1099
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1111
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1112 1113 1114 1115 1116
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1117
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1134 1135
  int32_t       code = 0;

D
dapan1121 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1154
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1201 1202
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1203
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1204
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1205
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1206 1207
  }

dengyihao's avatar
dengyihao 已提交
1208 1209
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1210
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1211
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1212
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1213 1214 1215 1216 1217 1218 1219 1220
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1221

D
dapan1121 已提交
1222
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1223
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1224 1225
  }

D
dapan1121 已提交
1226
  CTG_CACHE_STAT_INC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1227

D
dapan1121 已提交
1228
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
D
dapan1121 已提交
1229
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1230

dengyihao's avatar
dengyihao 已提交
1231
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1232 1233 1234

  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));

dengyihao's avatar
dengyihao 已提交
1235
  ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
D
dapan1121 已提交
1236 1237 1238 1239 1240 1241 1242 1243 1244 1245

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1246
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1247 1248 1249
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1250

D
dapan1121 已提交
1251 1252 1253 1254
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1255

dengyihao's avatar
dengyihao 已提交
1256 1257 1258
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1259
    }
dengyihao's avatar
dengyihao 已提交
1260

D
dapan1121 已提交
1261
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1262 1263 1264
  }
}

dengyihao's avatar
dengyihao 已提交
1265
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1266
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1267 1268

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1269

D
dapan1121 已提交
1270
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1271

D
dapan1121 已提交
1272
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1273
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1274 1275
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1276 1277 1278
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1279
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1280 1281 1282 1283 1284 1285

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1286
  CTG_CACHE_STAT_DEC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1287 1288
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1289 1290 1291
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1292 1293
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1294 1295
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1296

D
dapan1121 已提交
1297
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1298
    // TODO OPEN IT
D
dapan1121 已提交
1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1315

D
dapan1121 已提交
1316 1317 1318 1319 1320 1321 1322
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1323

D
dapan1121 已提交
1324 1325 1326 1327 1328 1329 1330 1331 1332
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1333 1334
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1335 1336 1337 1338 1339 1340 1341 1342 1343
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1344

D
dapan1121 已提交
1345 1346
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1347

dengyihao's avatar
dengyihao 已提交
1348 1349
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1350

dengyihao's avatar
dengyihao 已提交
1351 1352
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1353

dengyihao's avatar
dengyihao 已提交
1354 1355
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1356

dengyihao's avatar
dengyihao 已提交
1357 1358
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1359 1360
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1361
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1362 1363 1364
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1365 1366 1367 1368 1369
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1370 1371 1372
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1373 1374
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1375 1376
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1377 1378
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1379

D
dapan1121 已提交
1380
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1381
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1382
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1383
      } else {
D
dapan1121 已提交
1384
        CTG_CACHE_STAT_DEC(numOfStb, 1);
dengyihao's avatar
dengyihao 已提交
1385
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1386 1387 1388 1389
      }
    }
  }

D
dapan1121 已提交
1390 1391 1392 1393 1394 1395
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      taosMemoryFree(meta);
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1396
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1397
    }
dengyihao's avatar
dengyihao 已提交
1398

D
dapan1121 已提交
1399 1400
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1401
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1402 1403
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1404
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1405 1406 1407
  }

  if (NULL == orig) {
D
dapan1121 已提交
1408
    CTG_CACHE_STAT_INC(numOfTbl, 1);
D
dapan1121 已提交
1409 1410 1411 1412 1413 1414 1415 1416 1417
  }

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1418
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1419
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1420
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1421 1422
  }

D
dapan1121 已提交
1423
  CTG_CACHE_STAT_INC(numOfStb, 1);
D
dapan1121 已提交
1424

dengyihao's avatar
dengyihao 已提交
1425 1426
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1427

D
dapan1121 已提交
1428 1429 1430 1431
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
  
D
dapan1121 已提交
1432 1433 1434
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1435
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1436
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1437
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1438
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1439
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1440 1441 1442
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1443 1444 1445
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1446 1447 1448
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1449

D
dapan1121 已提交
1450 1451
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1452 1453
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1454
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1455 1456 1457
    }

    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1458 1459
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1460

D
dapan1121 已提交
1461
    if (suid) {
D
dapan1121 已提交
1462
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1463
    }
dengyihao's avatar
dengyihao 已提交
1464

D
dapan1121 已提交
1465 1466 1467
    return TSDB_CODE_SUCCESS;
  }

1468 1469
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1470
  if (pCache->pIndex) {
D
dapan1121 已提交
1471 1472 1473
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1474 1475 1476 1477 1478
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1479 1480
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1481 1482
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1483 1484
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1485

D
dapan1121 已提交
1486 1487 1488
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1489

D
dapan1121 已提交
1490 1491 1492
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1493 1494 1495 1496
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1497
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1498 1499 1500
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1501 1502

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1503

D
dapan1121 已提交
1504 1505 1506 1507 1508 1509
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1510
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1511
  SCatalog *pCtg = NULL;
1512

dengyihao's avatar
dengyihao 已提交
1513
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1514
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1515
    pCtg = *(SCatalog **)pIter;
1516 1517

    if (pCtg) {
D
dapan1121 已提交
1518 1519 1520 1521 1522 1523 1524 1525
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1526
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1527

dengyihao's avatar
dengyihao 已提交
1528
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1529
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1530
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1531 1532 1533

    if (pCtg) {
      ctgFreeHandle(pCtg);
1534 1535 1536 1537 1538 1539 1540 1541
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

D
dapan1121 已提交
1542
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1543
  int32_t          code = 0;
D
dapan1121 已提交
1544
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1545 1546 1547 1548
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1549
  if (NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1550
    goto _return;
D
dapan1121 已提交
1551
  }
dengyihao's avatar
dengyihao 已提交
1552

D
dapan1121 已提交
1553
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
dengyihao's avatar
dengyihao 已提交
1554 1555
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1556
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1557 1558
  }

dengyihao's avatar
dengyihao 已提交
1559
  bool         newAdded = false;
D
dapan1121 已提交
1560 1561 1562
  SDbVgVersion vgVersion = {.dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1563
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1564
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1565
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1566
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1567 1568 1569
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1570
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1571

D
dapan1121 已提交
1572 1573
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1574

D
dapan1121 已提交
1575 1576 1577
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
      ctgDebug("db vgVer is old, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion, vgInfo->vgVersion);
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1578

D
dapan1121 已提交
1579
      goto _return;
D
dapan1121 已提交
1580 1581 1582
    }

    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable) {
dengyihao's avatar
dengyihao 已提交
1583 1584
      ctgDebug("no new db vgVer or numOfTable, dbFName:%s, vgVer:%d, numOfTable:%d", dbFName, dbInfo->vgVersion,
               dbInfo->numOfTable);
D
dapan1121 已提交
1585
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1586

D
dapan1121 已提交
1587
      goto _return;
D
dapan1121 已提交
1588 1589 1590 1591 1592 1593 1594 1595
    }

    ctgFreeVgInfo(vgInfo);
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;

dengyihao's avatar
dengyihao 已提交
1596
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
D
dapan1121 已提交
1597 1598 1599 1600 1601

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

D
dapan1121 已提交
1602
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
dengyihao's avatar
dengyihao 已提交
1603 1604
  CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                 ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
D
dapan1121 已提交
1605 1606 1607 1608 1609

_return:

  ctgFreeVgInfo(msg->dbInfo);
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1610

D
dapan1121 已提交
1611 1612 1613
  CTG_RET(code);
}

D
dapan1121 已提交
1614
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1615
  int32_t        code = 0;
D
dapan1121 已提交
1616
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1617
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1618 1619 1620 1621 1622 1623

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1624

D
dapan1121 已提交
1625
  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1626 1627
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1628 1629
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1630

D
dapan1121 已提交
1631 1632 1633 1634 1635
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1636

D
dapan1121 已提交
1637 1638 1639
  CTG_RET(code);
}

D
dapan1121 已提交
1640
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1641
  int32_t              code = 0;
D
dapan1121 已提交
1642
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1643
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1644 1645 1646 1647 1648 1649

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1650

dengyihao's avatar
dengyihao 已提交
1651
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1652

D
dapan1121 已提交
1653 1654
  ctgFreeVgInfo(dbCache->vgCache.vgInfo);
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1655 1656 1657

  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1658
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1659 1660 1661 1662

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1663

D
dapan1121 已提交
1664 1665 1666
  CTG_RET(code);
}

D
dapan1121 已提交
1667
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1668
  int32_t              code = 0;
D
dapan1121 已提交
1669
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1670 1671 1672
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1673

D
dapan1121 已提交
1674 1675
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1676 1677 1678
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1679 1680
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1681
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1682 1683
  }

D
dapan1121 已提交
1684
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1685
  if (NULL == dbCache) {
D
dapan1121 已提交
1686
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1687 1688 1689
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1690 1691
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1692
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1693
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1694
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1695 1696
  }

D
dapan1121 已提交
1697
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1698
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1699 1700 1701 1702
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1703 1704
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1705 1706 1707 1708
  }

_return:

D
dapan1121 已提交
1709 1710
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1711

D
dapan1121 已提交
1712
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1713

D
dapan1121 已提交
1714 1715 1716
  CTG_RET(code);
}

D
dapan1121 已提交
1717
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1718
  int32_t             code = 0;
D
dapan1121 已提交
1719
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1720
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1721 1722 1723 1724

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1725
    goto _return;
D
dapan1121 已提交
1726 1727 1728
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1729
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1730
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1731
    goto _return;
D
dapan1121 已提交
1732
  }
dengyihao's avatar
dengyihao 已提交
1733

D
dapan1121 已提交
1734
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1735 1736
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1737
  } else {
D
dapan1121 已提交
1738
    CTG_CACHE_STAT_DEC(numOfStb, 1);
D
dapan1121 已提交
1739 1740
  }

dengyihao's avatar
dengyihao 已提交
1741
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1742 1743 1744 1745 1746 1747 1748 1749 1750
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1751 1752
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1753
  } else {
D
dapan1121 已提交
1754
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1755
  }
dengyihao's avatar
dengyihao 已提交
1756 1757

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1758 1759

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1760 1761 1762

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1763 1764 1765
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1766

D
dapan1121 已提交
1767 1768 1769
  CTG_RET(code);
}

D
dapan1121 已提交
1770
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1771
  int32_t             code = 0;
D
dapan1121 已提交
1772
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1773
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1774 1775 1776 1777

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1778
    goto _return;
D
dapan1121 已提交
1779 1780 1781
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1782 1783
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
1784
    goto _return;
D
dapan1121 已提交
1785 1786
  }

dengyihao's avatar
dengyihao 已提交
1787
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
1788 1789 1790 1791 1792 1793 1794 1795
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
1796

D
dapan1121 已提交
1797 1798 1799
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1800
  } else {
D
dapan1121 已提交
1801
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1802 1803
  }

D
dapan1121 已提交
1804
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
1805 1806 1807 1808 1809 1810 1811 1812

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
1813
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1814
  int32_t            code = 0;
D
dapan1121 已提交
1815
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1816 1817
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
1818 1819 1820 1821 1822 1823 1824 1825 1826 1827 1828 1829 1830 1831 1832 1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845 1846 1847 1848 1849 1850 1851 1852 1853 1854 1855 1856 1857 1858 1859 1860
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

    userAuth.version = msg->userAuth.version;
    userAuth.superUser = msg->userAuth.superAuth;
    userAuth.createdDbs = msg->userAuth.createdDbs;
    userAuth.readDbs = msg->userAuth.readDbs;
    userAuth.writeDbs = msg->userAuth.writeDbs;

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

    return TSDB_CODE_SUCCESS;
  }

  pUser->version = msg->userAuth.version;

  CTG_LOCK(CTG_WRITE, &pUser->lock);

  taosHashCleanup(pUser->createdDbs);
  pUser->createdDbs = msg->userAuth.createdDbs;
  msg->userAuth.createdDbs = NULL;

  taosHashCleanup(pUser->readDbs);
  pUser->readDbs = msg->userAuth.readDbs;
  msg->userAuth.readDbs = NULL;

  taosHashCleanup(pUser->writeDbs);
  pUser->writeDbs = msg->userAuth.writeDbs;
  msg->userAuth.writeDbs = NULL;

  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
dengyihao's avatar
dengyihao 已提交
1861

D
dapan1121 已提交
1862
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1863

D
dapan1121 已提交
1864 1865 1866
  CTG_RET(code);
}

D
dapan1121 已提交
1867
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1868
  int32_t             code = 0;
D
dapan1121 已提交
1869
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1870 1871
  SCatalog           *pCtg = msg->pCtg;

D
dapan1121 已提交
1872
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1873
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
1874 1875 1876 1877 1878
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
1879 1880
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
1881
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
1882
  if (NULL == vgInfo) {
D
dapan1121 已提交
1883 1884 1885
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1886 1887

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
1888 1889 1890 1891 1892
  if (NULL == pInfo) {
    ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
1893 1894 1895 1896 1897
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
1898

D
dapan1121 已提交
1899
  pInfo->epSet = msg->epSet;
D
dapan1121 已提交
1900 1901 1902 1903

_return:

  if (dbCache) {
D
dapan1121 已提交
1904
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1905 1906 1907
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1908

D
dapan1121 已提交
1909 1910 1911
  CTG_RET(code);
}

D
dapan1121 已提交
1912
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1913
  int32_t               code = 0;
D
dapan1121 已提交
1914
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1915 1916 1917 1918
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
1919
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
1920 1921 1922 1923 1924 1925 1926 1927 1928

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
1929

D
dapan1121 已提交
1930
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1931

D
dapan1121 已提交
1932 1933 1934 1935
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1936
  int32_t             code = 0;
D
dapan1121 已提交
1937
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1938 1939 1940
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
1941
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
1942
  if (NULL == dbCache) {
D
dapan1121 已提交
1943 1944 1945
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
1946
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
1947 1948
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1949
  }
D
dapan1121 已提交
1950 1951 1952
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
1953

D
dapan1121 已提交
1954
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
1955 1956 1957 1958 1959 1960 1961

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
1962

D
dapan1121 已提交
1963
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1964

D
dapan1121 已提交
1965 1966 1967
  CTG_RET(code);
}

D
dapan1121 已提交
1968
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1969
  int32_t            code = 0;
D
dapan1121 已提交
1970
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1971
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
1972

D
dapan1121 已提交
1973 1974
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
1975
  if (pCtg) {
D
dapan1121 已提交
1976 1977 1978 1979 1980
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
1981

D
dapan1121 已提交
1982 1983
    goto _return;
  }
D
dapan1121 已提交
1984 1985 1986 1987 1988 1989

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
1990 1991

_return:
D
dapan1121 已提交
1992 1993

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
1994

D
dapan1121 已提交
1995
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1996

D
dapan1121 已提交
1997 1998 1999
  CTG_RET(code);
}

D
dapan1121 已提交
2000
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2001 2002
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2003
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2004
  bool                stopQueue = false;
D
dapan1121 已提交
2005 2006 2007 2008 2009

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2010 2011 2012 2013 2014 2015
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
dengyihao's avatar
dengyihao 已提交
2016
          CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2017 2018
        } else {
          taosMemoryFree(op->data);
dengyihao's avatar
dengyihao 已提交
2019
          CTG_RT_STAT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2020
        }
dengyihao's avatar
dengyihao 已提交
2021

D
dapan1121 已提交
2022 2023
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2024
        } else {
D
dapan1121 已提交
2025
          taosMemoryFree(op);
D
dapan1121 已提交
2026
        }
D
dapan1121 已提交
2027
      }
D
dapan1121 已提交
2028 2029 2030

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2031

D
dapan1121 已提交
2032
      node = nodeNext;
D
dapan1121 已提交
2033 2034
    }

D
dapan1121 已提交
2035
    if (!stopQueue) {
D
dapan1121 已提交
2036 2037 2038 2039
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2040 2041 2042 2043 2044 2045
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2046
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2047
  setThreadName("catalog");
2048

D
dapan1121 已提交
2049 2050 2051 2052 2053 2054
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2055 2056

    if (atomic_load_8((int8_t *)&gCtgMgmt.exit)) {
D
dapan1121 已提交
2057
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2058 2059 2060
      break;
    }

D
dapan1121 已提交
2061 2062 2063
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2064

D
dapan1121 已提交
2065
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2066

D
dapan1121 已提交
2067
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2068

D
dapan1121 已提交
2069
    if (operation->syncOp) {
D
dapan1121 已提交
2070
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2071 2072
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2073 2074
    }

dengyihao's avatar
dengyihao 已提交
2075
    CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2076

D
dapan1121 已提交
2077
    ctgdShowCacheInfo();
D
dapan1121 已提交
2078 2079 2080 2081
    ctgdShowClusterCache(pCtg);
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2082

D
dapan1121 已提交
2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2095

D
dapan1121 已提交
2096 2097 2098 2099
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2100
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2101
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2123
#if 0
2124
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2162
      fetch.tbIdx = i;
D
dapan1121 已提交
2163 2164 2165 2166 2167 2168
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2169
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2170 2171 2172
  }

  if (NULL == ctx->pFetchs) {
2173
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2174 2175 2176 2177
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2178 2179
#endif

dengyihao's avatar
dengyihao 已提交
2180 2181 2182 2183 2184 2185 2186 2187 2188
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2189 2190 2191 2192 2193 2194 2195 2196
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2197
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2198
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2199

D
dapan1121 已提交
2200 2201 2202
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2203 2204
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2205 2206 2207 2208 2209 2210
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
dengyihao's avatar
dengyihao 已提交
2211
    SName *pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2212 2213 2214 2215

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2216 2217
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
dengyihao's avatar
dengyihao 已提交
2218

D
dapan1121 已提交
2219 2220 2221 2222 2223 2224
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2225 2226
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
dengyihao's avatar
dengyihao 已提交
2227

D
dapan1121 已提交
2228 2229 2230
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2231
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2232 2233 2234 2235 2236 2237 2238 2239

    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2240 2241
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2242 2243 2244 2245
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2246
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2247 2248
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2249

D
dapan1121 已提交
2250
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2251

2252
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2253 2254
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2255
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2256

D
dapan1121 已提交
2257
      res.pRes = pTableMeta;
2258
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2259 2260 2261

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2262

D
dapan1121 已提交
2263 2264 2265 2266
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2267
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2268

2269
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2270 2271
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2272
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2273

D
dapan1121 已提交
2274
      res.pRes = pTableMeta;
2275
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2276

D
dapan1121 已提交
2277 2278
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2279

D
dapan1121 已提交
2280 2281 2282
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2283
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2284 2285
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2286

D
dapan1121 已提交
2287
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2288

2289
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2290 2291 2292 2293 2294 2295
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2296 2297
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2298 2299
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2300 2301 2302 2303 2304 2305 2306 2307 2308

      taosMemoryFreeClear(pTableMeta);
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2309

2310 2311
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2312

dengyihao's avatar
dengyihao 已提交
2313
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2314 2315 2316 2317 2318 2319 2320 2321
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2322
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2323 2324
      taosHashRelease(dbCache->tbCache, pCache);

2325 2326
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2327 2328 2329 2330 2331

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2332 2333 2334

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2335
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2336 2337 2338 2339 2340
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2341 2342
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2343 2344 2345 2346 2347

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2348

D
dapan1121 已提交
2349 2350
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2351
    if (NULL == pTableMeta) {
2352
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2353 2354
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2355

D
dapan1121 已提交
2356
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2357

2358
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2359 2360
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2361
    res.pRes = pTableMeta;
2362
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2363 2364 2365 2366 2367

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2368
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2369

D
dapan1121 已提交
2370 2371 2372
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2373
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2374
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2375
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2376 2377 2378 2379 2380 2381 2382 2383 2384 2385 2386 2387 2388 2389 2390 2391 2392 2393 2394 2395 2396 2397 2398 2399 2400 2401 2402 2403
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2404
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2405 2406 2407 2408
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2409
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2410 2411 2412 2413 2414 2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427 2428 2429 2430 2431 2432 2433 2434 2435
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}