ctgCache.c 71.5 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

dengyihao's avatar
dengyihao 已提交
22 23 24 25 26 27 28 29 30 31 32
SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update vgInfo", ctgOpUpdateVgroup},
                                                {CTG_OP_UPDATE_TB_META, "update tbMeta", ctgOpUpdateTbMeta},
                                                {CTG_OP_DROP_DB_CACHE, "drop DB", ctgOpDropDbCache},
                                                {CTG_OP_DROP_DB_VGROUP, "drop DBVgroup", ctgOpDropDbVgroup},
                                                {CTG_OP_DROP_STB_META, "drop stbMeta", ctgOpDropStbMeta},
                                                {CTG_OP_DROP_TB_META, "drop tbMeta", ctgOpDropTbMeta},
                                                {CTG_OP_UPDATE_USER, "update user", ctgOpUpdateUser},
                                                {CTG_OP_UPDATE_VG_EPSET, "update epset", ctgOpUpdateEpset},
                                                {CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
                                                {CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
                                                {CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
D
dapan1121 已提交
33

D
dapan1121 已提交
34 35
int32_t ctgRLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
  CTG_LOCK(CTG_READ, &dbCache->vgCache.vgLock);
dengyihao's avatar
dengyihao 已提交
36

D
dapan1121 已提交
37
  if (dbCache->deleted) {
D
dapan1121 已提交
38
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
39

dengyihao's avatar
dengyihao 已提交
40 41
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);

D
dapan1121 已提交
42 43 44 45
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
46 47
  if (NULL == dbCache->vgCache.vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
48 49

    *inCache = false;
dengyihao's avatar
dengyihao 已提交
50
    ctgDebug("db vgInfo is empty, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
51 52 53 54
    return TSDB_CODE_SUCCESS;
  }

  *inCache = true;
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56 57 58
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
59 60
int32_t ctgWLockVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
61 62

  if (dbCache->deleted) {
dengyihao's avatar
dengyihao 已提交
63
    ctgDebug("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
64
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock);
D
dapan1121 已提交
65 66 67 68 69 70
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
71
void ctgRUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_READ, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
72

dengyihao's avatar
dengyihao 已提交
73
void ctgWUnlockVgInfo(SCtgDBCache *dbCache) { CTG_UNLOCK(CTG_WRITE, &dbCache->vgCache.vgLock); }
D
dapan1121 已提交
74

dengyihao's avatar
dengyihao 已提交
75 76
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->dbLock);
D
dapan1121 已提交
77 78
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
79

dengyihao's avatar
dengyihao 已提交
80
int32_t ctgAcquireDBCacheImpl(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
81
  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
82
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
83 84 85
    dbFName = p + 1;
  }

D
dapan1121 已提交
86 87 88 89 90 91 92
  SCtgDBCache *dbCache = NULL;

  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
dengyihao's avatar
dengyihao 已提交
93

D
dapan1121 已提交
94 95 96 97 98 99
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
100 101 102 103
  if (acquire) {
    CTG_LOCK(CTG_READ, &dbCache->dbLock);
  }

D
dapan1121 已提交
104 105 106
  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
107 108
    }

D
dapan1121 已提交
109 110 111 112 113 114
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
dengyihao's avatar
dengyihao 已提交
115

D
dapan1121 已提交
116 117 118
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
119
int32_t ctgAcquireDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
120 121 122
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

dengyihao's avatar
dengyihao 已提交
123
int32_t ctgGetDBCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
124 125 126
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}

dengyihao's avatar
dengyihao 已提交
127
void ctgReleaseVgInfoToCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
128 129 130
  ctgRUnlockVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
}
D
dapan1121 已提交
131

dengyihao's avatar
dengyihao 已提交
132
void ctgReleaseTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
133 134
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
135
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
136
  }
D
dapan1121 已提交
137

D
dapan1121 已提交
138 139
  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
140
  }
D
dapan1121 已提交
141
}
D
dapan1121 已提交
142

dengyihao's avatar
dengyihao 已提交
143
void ctgReleaseTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, SCtgTbCache *pCache) {
D
dapan1121 已提交
144 145
  if (pCache) {
    CTG_UNLOCK(CTG_READ, &pCache->indexLock);
dengyihao's avatar
dengyihao 已提交
146
    taosHashRelease(dbCache->tbCache, pCache);
D
dapan1121 已提交
147 148 149 150 151 152 153
  }

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }
}

dengyihao's avatar
dengyihao 已提交
154
int32_t ctgAcquireVgInfoFromCache(SCatalog *pCtg, const char *dbFName, SCtgDBCache **pCache) {
D
dapan1121 已提交
155
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
156
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
157
  if (NULL == dbCache) {
D
dapan1121 已提交
158 159 160 161 162
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }

  bool inCache = false;
D
dapan1121 已提交
163
  ctgRLockVgInfo(pCtg, dbCache, &inCache);
D
dapan1121 已提交
164 165 166 167 168 169 170
  if (!inCache) {
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
  }

  *pCache = dbCache;

D
dapan1121 已提交
171
  CTG_CACHE_STAT_INC(numOfVgHit, 1);
D
dapan1121 已提交
172 173

  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
dengyihao's avatar
dengyihao 已提交
174

D
dapan1121 已提交
175 176 177 178 179 180 181 182 183 184
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;

D
dapan1121 已提交
185
  CTG_CACHE_STAT_INC(numOfVgMiss, 1);
dengyihao's avatar
dengyihao 已提交
186

D
dapan1121 已提交
187 188 189
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
190
int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
191
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
192
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
193 194 195 196 197
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
198

D
dapan1121 已提交
199
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
200 201 202
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
203 204
  }

D
dapan1121 已提交
205 206 207 208 209 210 211 212 213 214
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
215

D
dapan1121 已提交
216
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
217 218 219 220 221 222 223

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
224
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
dengyihao's avatar
dengyihao 已提交
225

D
dapan1121 已提交
226 227 228
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
229 230 231
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
  SCtgDBCache *dbCache = NULL;
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
232 233 234 235 236
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
237 238

  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
D
dapan1121 已提交
239
  if (NULL == stName) {
D
dapan1121 已提交
240
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
241 242 243 244 245
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
D
dapan1121 已提交
246
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
D
dapan1121 已提交
247 248 249 250
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

D
dapan1121 已提交
251 252
  taosHashRelease(dbCache->stbCache, stName);

D
dapan1121 已提交
253 254
  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
D
dapan1121 已提交
255
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
D
dapan1121 已提交
256 257 258 259 260 261
    goto _return;
  }

  *pDb = dbCache;
  *pTb = pCache;

D
dapan1121 已提交
262
  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
dengyihao's avatar
dengyihao 已提交
263

D
dapan1121 已提交
264
  CTG_CACHE_STAT_INC(numOfMetaHit, 1);
D
dapan1121 已提交
265 266 267 268 269 270 271

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
272
  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
D
dapan1121 已提交
273 274 275

  *pDb = NULL;
  *pTb = NULL;
dengyihao's avatar
dengyihao 已提交
276

D
dapan1121 已提交
277 278 279
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322

int32_t ctgAcquireStbMetaFromCache2(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
  SCtgTbCache *pCache = NULL;
  char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
  if (NULL == stName) {
    ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
  if (NULL == pCache) {
    ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
    taosHashRelease(dbCache->stbCache, stName);
    goto _return;
  }

  taosHashRelease(dbCache->stbCache, stName);

  CTG_LOCK(CTG_READ, &pCache->metaLock);
  if (NULL == pCache->pMeta) {
    ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
    goto _return;
  }

  *pTb = pCache;

  ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);

  CTG_CACHE_STAT_INC(numOfMetaHit, 1);

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);

  CTG_CACHE_STAT_INC(numOfMetaMiss, 1);

  *pTb = NULL;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
323
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
D
dapan1121 已提交
324
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
325
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
326 327
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
328 329 330
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
331

D
dapan1121 已提交
332
  int32_t sz = 0;
D
dapan1121 已提交
333
  pCache = taosHashAcquire(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
334 335 336 337 338 339 340 341 342
  if (NULL == pCache) {
    ctgDebug("tb %s not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
  }

  CTG_LOCK(CTG_READ, &pCache->indexLock);
  if (NULL == pCache->pIndex) {
    ctgDebug("tb %s index not in cache, dbFName:%s", tbName, dbFName);
    goto _return;
D
dapan1121 已提交
343 344
  }

D
dapan1121 已提交
345 346 347 348
  *pDb = dbCache;
  *pTb = pCache;

  ctgDebug("tb %s index got in cache, dbFName:%s", tbName, dbFName);
dengyihao's avatar
dengyihao 已提交
349

D
dapan1121 已提交
350
  CTG_CACHE_STAT_INC(numOfIndexHit, 1);
D
dapan1121 已提交
351 352 353 354 355 356 357

  return TSDB_CODE_SUCCESS;

_return:

  ctgReleaseTbIndexToCache(pCtg, dbCache, pCache);

D
dapan1121 已提交
358
  CTG_CACHE_STAT_INC(numOfIndexMiss, 1);
dengyihao's avatar
dengyihao 已提交
359

D
dapan1121 已提交
360 361 362
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
363
int32_t ctgTbMetaExistInCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *exist) {
D
dapan1121 已提交
364
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
365
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
366 367 368
  ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
369

D
dapan1121 已提交
370 371 372 373 374
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
375
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
376

D
dapan1121 已提交
377 378 379
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
380 381
int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
  int32_t      code = 0;
D
dapan1121 已提交
382
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
383
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
384 385 386 387 388 389 390 391 392
  *pTableMeta = NULL;

  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    strcpy(dbFName, ctx->pName->dbname);
  } else {
    tNameGetFullDbName(ctx->pName, dbFName);
  }

D
dapan1121 已提交
393 394 395
  ctgAcquireTbMetaFromCache(pCtg, dbFName, ctx->pName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
396 397 398
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
399
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
400 401 402 403
  ctx->tbInfo.inCache = true;
  ctx->tbInfo.dbId = dbCache->dbId;
  ctx->tbInfo.suid = tbMeta->suid;
  ctx->tbInfo.tbType = tbMeta->tableType;
dengyihao's avatar
dengyihao 已提交
404

D
dapan1121 已提交
405
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
406 407 408 409 410 411 412 413
    int32_t metaSize = CTG_META_SIZE(tbMeta);
    *pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
414

D
dapan1121 已提交
415 416
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
    ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
D
dapan1121 已提交
417 418
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
419 420

  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
421

D
dapan1121 已提交
422 423 424 425
  int32_t metaSize = sizeof(SCTableMeta);
  *pTableMeta = taosMemoryCalloc(1, metaSize);
  if (NULL == *pTableMeta) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
426 427
  }

D
dapan1121 已提交
428
  memcpy(*pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
429

D
dapan1121 已提交
430 431 432 433 434 435 436
  //ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

  if (tbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
    taosHashRelease(dbCache->tbCache, tbCache);
  }
  
dengyihao's avatar
dengyihao 已提交
437 438
  ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
           ctx->tbInfo.tbType, dbFName);
D
dapan1121 已提交
439

D
dapan1121 已提交
440
  ctgAcquireStbMetaFromCache2(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
dengyihao's avatar
dengyihao 已提交
441 442 443

  STableMeta *stbMeta = tbCache->pMeta;
  if (stbMeta->suid != ctx->tbInfo.suid) {
D
dapan1121 已提交
444
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
445
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid, ctx->tbInfo.suid);
D
dapan1121 已提交
446 447 448
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
449
  metaSize = CTG_META_SIZE(stbMeta);
D
dapan1121 已提交
450
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
451
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
452
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
453
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
454 455
  }

D
dapan1121 已提交
456
  memcpy(&(*pTableMeta)->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
D
dapan1121 已提交
457

D
dapan1121 已提交
458
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
459

D
dapan1121 已提交
460
  ctgDebug("Got tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
461

D
dapan1121 已提交
462 463 464 465
  return TSDB_CODE_SUCCESS;

_return:

D
dapan1121 已提交
466
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
467
  taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
468
  *pTableMeta = NULL;
dengyihao's avatar
dengyihao 已提交
469

D
dapan1121 已提交
470 471 472
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
473 474
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType,
                              uint64_t *suid, char *stbName) {
D
dapan1121 已提交
475
  *sver = -1;
D
dapan1121 已提交
476
  *tver = -1;
D
dapan1121 已提交
477 478

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
479
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
480
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
481 482
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
483 484 485
  ctgAcquireTbMetaFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
486 487 488
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
489
  STableMeta *tbMeta = tbCache->pMeta;
D
dapan1121 已提交
490 491
  *tbType = tbMeta->tableType;
  *suid = tbMeta->suid;
D
dapan1121 已提交
492

D
dapan1121 已提交
493
  if (*tbType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
494 495 496
    *sver = tbMeta->sversion;
    *tver = tbMeta->tversion;

dengyihao's avatar
dengyihao 已提交
497 498
    ctgDebug("Got tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
             dbFName, *tbType, *sver, *tver, *suid);
D
dapan1121 已提交
499

D
dapan1121 已提交
500
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
501 502 503
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
504
  // PROCESS FOR CHILD TABLE
dengyihao's avatar
dengyihao 已提交
505

D
dapan1121 已提交
506 507
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
  ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
dengyihao's avatar
dengyihao 已提交
508

D
dapan1121 已提交
509 510
  ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache);
  if (NULL == tbCache) {
D
dapan1121 已提交
511
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
512
    ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
D
dapan1121 已提交
513 514
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
515 516

  STableMeta *stbMeta = tbCache->pMeta;
D
dapan1121 已提交
517
  if (stbMeta->suid != *suid) {
D
dapan1121 已提交
518
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
dengyihao's avatar
dengyihao 已提交
519
    ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid:0x%" PRIx64, stbMeta->suid, *suid);
D
dapan1121 已提交
520 521 522
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
523
  size_t nameLen = 0;
D
dapan1121 已提交
524
  char  *name = taosHashGetKey(tbCache, &nameLen);
D
dapan1121 已提交
525 526 527 528

  strncpy(stbName, name, nameLen);
  stbName[nameLen] = 0;

D
dapan1121 已提交
529 530
  *sver = stbMeta->sversion;
  *tver = stbMeta->tversion;
D
dapan1121 已提交
531

D
dapan1121 已提交
532
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
533

dengyihao's avatar
dengyihao 已提交
534 535
  ctgDebug("Got tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
           dbFName);
D
dapan1121 已提交
536 537 538 539

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
540
int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int32_t *tbType) {
D
dapan1121 已提交
541
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
542
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
543
  CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
D
dapan1121 已提交
544 545
  if (NULL == tbCache) {
    ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
546 547
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
548 549 550 551

  *tbType = tbCache->pMeta->tableType;
  ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);

dengyihao's avatar
dengyihao 已提交
552 553
  ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);

D
dapan1121 已提交
554 555 556
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
557 558
int32_t ctgReadTbIndexFromCache(SCatalog *pCtg, SName *pTableName, SArray **pRes) {
  int32_t      code = 0;
D
dapan1121 已提交
559
  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
560
  SCtgTbCache *tbCache = NULL;
D
dapan1121 已提交
561 562
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
563

D
dapan1121 已提交
564
  *pRes = NULL;
D
dapan1121 已提交
565

D
dapan1121 已提交
566 567 568
  ctgAcquireTbIndexFromCache(pCtg, dbFName, pTableName->tname, &dbCache, &tbCache);
  if (NULL == tbCache) {
    ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
569 570 571
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
572
  CTG_ERR_JRET(ctgCloneTableIndex(tbCache->pIndex->pIndex, pRes));
D
dapan1121 已提交
573

D
dapan1121 已提交
574
_return:
D
dapan1121 已提交
575

D
dapan1121 已提交
576
  ctgReleaseTbIndexToCache(pCtg, dbCache, tbCache);
D
dapan1121 已提交
577

D
dapan1121 已提交
578
  CTG_RET(code);
D
dapan1121 已提交
579 580
}

dengyihao's avatar
dengyihao 已提交
581
int32_t ctgChkAuthFromCache(SCatalog *pCtg, char *user, char *dbFName, AUTH_TYPE type, bool *inCache, bool *pass) {
582 583 584 585 586 587
  char *p = strchr(dbFName, '.');
  if (p) {
    ++p;
  } else {
    p = dbFName;
  }
dengyihao's avatar
dengyihao 已提交
588

589 590 591 592 593 594 595
  if (IS_SYS_DBNAME(p)) {
    *inCache = true;
    *pass = true;
    ctgDebug("sysdb %s, pass", dbFName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
596 597 598 599 600 601 602 603 604
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, user, strlen(user));
  if (NULL == pUser) {
    ctgDebug("user not in cache, user:%s", user);
    goto _return;
  }

  *inCache = true;

  ctgDebug("Got user from cache, user:%s", user);
D
dapan1121 已提交
605
  CTG_CACHE_STAT_INC(numOfUserHit, 1);
dengyihao's avatar
dengyihao 已提交
606

D
dapan1121 已提交
607 608 609 610 611 612 613 614 615 616 617
  if (pUser->superUser) {
    *pass = true;
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_READ, &pUser->lock);
  if (pUser->createdDbs && taosHashGet(pUser->createdDbs, dbFName, strlen(dbFName))) {
    *pass = true;
    CTG_UNLOCK(CTG_READ, &pUser->lock);
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
618

D
dapan1121 已提交
619 620 621
  if (pUser->readDbs && taosHashGet(pUser->readDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_READ) {
    *pass = true;
  }
dengyihao's avatar
dengyihao 已提交
622

D
dapan1121 已提交
623 624 625 626 627
  if (pUser->writeDbs && taosHashGet(pUser->writeDbs, dbFName, strlen(dbFName)) && type == AUTH_TYPE_WRITE) {
    *pass = true;
  }

  CTG_UNLOCK(CTG_READ, &pUser->lock);
dengyihao's avatar
dengyihao 已提交
628

D
dapan1121 已提交
629 630 631 632 633
  return TSDB_CODE_SUCCESS;

_return:

  *inCache = false;
D
dapan1121 已提交
634
  CTG_CACHE_STAT_INC(numOfUserMiss, 1);
dengyihao's avatar
dengyihao 已提交
635

D
dapan1121 已提交
636 637 638
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
639
void ctgDequeue(SCtgCacheOperation **op) {
D
dapan1121 已提交
640
  SCtgQNode *orig = gCtgMgmt.queue.head;
dengyihao's avatar
dengyihao 已提交
641

D
dapan1121 已提交
642 643 644
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;

D
dapan1121 已提交
645
  CTG_QUEUE_DEC();
dengyihao's avatar
dengyihao 已提交
646

D
dapan1121 已提交
647 648
  taosMemoryFreeClear(orig);

D
dapan1121 已提交
649
  *op = node->op;
D
dapan1121 已提交
650 651
}

dengyihao's avatar
dengyihao 已提交
652
int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
D
dapan1121 已提交
653 654 655
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
D
dapan1121 已提交
656 657
    taosMemoryFree(operation->data);
    taosMemoryFree(operation);
D
dapan1121 已提交
658
    CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
659 660
  }

dengyihao's avatar
dengyihao 已提交
661 662
  bool  syncOp = operation->syncOp;
  char *opName = gCtgCacheOperation[operation->opId].name;
D
dapan1121 已提交
663 664 665
  if (operation->syncOp) {
    tsem_init(&operation->rspSem, 0, 0);
  }
dengyihao's avatar
dengyihao 已提交
666

D
dapan1121 已提交
667
  node->op = operation;
D
dapan1121 已提交
668 669

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
670

D
dapan1121 已提交
671
  if (gCtgMgmt.queue.stopQueue) {
672 673 674 675
    ctgFreeQNode(node);
    CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
    CTG_RET(TSDB_CODE_CTG_EXIT);
  }
676

D
dapan1121 已提交
677 678
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
679 680 681

  gCtgMgmt.queue.stopQueue = operation->stopQueue;

D
dapan1121 已提交
682 683
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);

D
dapan1121 已提交
684 685
  ctgDebug("action [%s] added into queue", opName);

D
dapan1121 已提交
686
  CTG_QUEUE_INC();
D
dapan1121 已提交
687
  CTG_RT_STAT_INC(numOfOpEnqueue, 1);
D
dapan1121 已提交
688 689 690

  tsem_post(&gCtgMgmt.queue.reqSem);

D
dapan1121 已提交
691
  if (syncOp) {
692 693 694
    if (!operation->unLocked) {
      CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
695
    tsem_wait(&operation->rspSem);
696 697 698
    if (!operation->unLocked) {
      CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
    }
D
dapan1121 已提交
699
    taosMemoryFree(operation);
D
dapan1121 已提交
700 701 702 703 704
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
705 706
int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId) {
  int32_t             code = 0;
D
dapan1121 已提交
707 708
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_CACHE;
709
  op->syncOp = true;
dengyihao's avatar
dengyihao 已提交
710

D
dapan1121 已提交
711
  SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
712
  if (NULL == msg) {
D
dapan1121 已提交
713
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
D
dapan1121 已提交
714
    taosMemoryFree(op);
D
dapan1121 已提交
715
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
716 717 718
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
719
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
720 721 722 723
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
724
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
725 726
  msg->dbId = dbId;

D
dapan1121 已提交
727
  op->data = msg;
D
dapan1121 已提交
728

D
dapan1121 已提交
729
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
730 731 732 733 734 735 736 737

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
738 739
int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
740 741 742
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_DB_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
743

D
dapan1121 已提交
744 745 746
  SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
D
dapan1121 已提交
747
    taosMemoryFree(op);
D
dapan1121 已提交
748
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
749 750 751
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
752
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
753 754 755 756
    dbFName = p + 1;
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
757
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
758

D
dapan1121 已提交
759
  op->data = msg;
D
dapan1121 已提交
760

D
dapan1121 已提交
761
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
762 763 764 765 766 767 768 769

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
770 771 772
int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid,
                              bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
773 774 775
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_STB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
776

D
dapan1121 已提交
777
  SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
778
  if (NULL == msg) {
D
dapan1121 已提交
779
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
D
dapan1121 已提交
780
    taosMemoryFree(op);
D
dapan1121 已提交
781
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
782 783 784
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
785 786
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->stbName, stbName, sizeof(msg->stbName));
D
dapan1121 已提交
787 788 789
  msg->dbId = dbId;
  msg->suid = suid;

D
dapan1121 已提交
790
  op->data = msg;
D
dapan1121 已提交
791

D
dapan1121 已提交
792
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
793 794 795 796 797 798 799 800

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
801 802
int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
803 804 805
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
806

D
dapan1121 已提交
807
  SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
808
  if (NULL == msg) {
D
dapan1121 已提交
809
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
D
dapan1121 已提交
810
    taosMemoryFree(op);
D
dapan1121 已提交
811
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
812 813 814
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
815 816
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  tstrncpy(msg->tbName, tbName, sizeof(msg->tbName));
D
dapan1121 已提交
817 818
  msg->dbId = dbId;

D
dapan1121 已提交
819
  op->data = msg;
D
dapan1121 已提交
820

D
dapan1121 已提交
821
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
822 823 824 825 826 827 828 829

  return TSDB_CODE_SUCCESS;

_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
830 831
int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, SDBVgInfo *dbInfo, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
832 833 834
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VGROUP;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
835

D
dapan1121 已提交
836 837 838
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
839
    taosMemoryFree(op);
D
dapan1121 已提交
840
    ctgFreeVgInfo(dbInfo);
D
dapan1121 已提交
841
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
842 843 844
  }

  char *p = strchr(dbFName, '.');
D
dapan1121 已提交
845
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
846 847 848
    dbFName = p + 1;
  }

D
dapan1121 已提交
849
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
850 851 852 853
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;

D
dapan1121 已提交
854
  op->data = msg;
D
dapan1121 已提交
855

D
dapan1121 已提交
856
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
857 858 859 860 861 862 863 864 865

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeVgInfo(dbInfo);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
866 867
int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
868 869 870
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_META;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
871

D
dapan1121 已提交
872
  SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
873
  if (NULL == msg) {
D
dapan1121 已提交
874
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
D
dapan1121 已提交
875
    taosMemoryFree(op);
D
dapan1121 已提交
876
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
877 878 879
  }

  char *p = strchr(output->dbFName, '.');
D
dapan1121 已提交
880
  if (p && IS_SYS_DBNAME(p + 1)) {
D
dapan1121 已提交
881 882
    int32_t len = strlen(p + 1);
    memmove(output->dbFName, p + 1, len >= TSDB_DB_FNAME_LEN ? TSDB_DB_FNAME_LEN - 1 : len);
D
dapan1121 已提交
883 884 885
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
886
  msg->pMeta = output;
D
dapan1121 已提交
887

D
dapan1121 已提交
888
  op->data = msg;
D
dapan1121 已提交
889

D
dapan1121 已提交
890
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
891 892

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
893

D
dapan1121 已提交
894 895
_return:

D
dapan1121 已提交
896 897 898 899 900
  if (output) {
    taosMemoryFree(output->tbMeta);
    taosMemoryFree(output);
  }

D
dapan1121 已提交
901 902 903
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
904 905
int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEpSet *pEpSet) {
  int32_t             code = 0;
D
dapan1121 已提交
906 907
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_VG_EPSET;
dengyihao's avatar
dengyihao 已提交
908

D
dapan1121 已提交
909 910 911
  SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
D
dapan1121 已提交
912
    taosMemoryFree(op);
D
dapan1121 已提交
913
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
914 915 916
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
917
  tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
D
dapan1121 已提交
918 919 920
  msg->vgId = vgId;
  msg->epSet = *pEpSet;

D
dapan1121 已提交
921
  op->data = msg;
D
dapan1121 已提交
922

D
dapan1121 已提交
923
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
924 925

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
926

D
dapan1121 已提交
927 928 929 930 931
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
932 933
int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
934 935 936
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_USER;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
937

D
dapan1121 已提交
938 939 940
  SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
D
dapan1121 已提交
941
    taosMemoryFree(op);
D
dapan1121 已提交
942
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
943 944 945 946 947
  }

  msg->pCtg = pCtg;
  msg->userAuth = *pAuth;

D
dapan1121 已提交
948
  op->data = msg;
D
dapan1121 已提交
949

D
dapan1121 已提交
950
  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
951

D
dapan1121 已提交
952
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
953

D
dapan1121 已提交
954 955 956
_return:

  tFreeSGetUserAuthRsp(pAuth);
dengyihao's avatar
dengyihao 已提交
957

D
dapan1121 已提交
958 959 960
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
961 962
int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
963 964 965
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_UPDATE_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
966

D
dapan1121 已提交
967 968 969
  SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
D
dapan1121 已提交
970
    taosMemoryFree(op);
D
dapan1121 已提交
971
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
972 973 974
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
975
  msg->pIndex = *pIndex;
D
dapan1121 已提交
976 977 978 979

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
D
dapan1121 已提交
980 981

  *pIndex = NULL;
D
dapan1121 已提交
982
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
983

D
dapan1121 已提交
984 985
_return:

D
dapan1121 已提交
986
  taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
987
  taosMemoryFreeClear(*pIndex);
dengyihao's avatar
dengyihao 已提交
988

D
dapan1121 已提交
989 990 991
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
992 993
int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
994 995 996
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_DROP_TB_INDEX;
  op->syncOp = syncOp;
dengyihao's avatar
dengyihao 已提交
997

D
dapan1121 已提交
998 999 1000
  SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
D
dapan1121 已提交
1001
    taosMemoryFree(op);
D
dapan1121 已提交
1002
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1003 1004 1005 1006 1007 1008 1009 1010 1011
  }

  msg->pCtg = pCtg;
  tNameGetFullDbName(pName, msg->dbFName);
  strcpy(msg->tbName, pName->tname);

  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1012

D
dapan1121 已提交
1013
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1014

D
dapan1121 已提交
1015 1016 1017 1018 1019
_return:

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1020 1021
int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool syncOp) {
  int32_t             code = 0;
D
dapan1121 已提交
1022 1023 1024
  SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
  op->opId = CTG_OP_CLEAR_CACHE;
  op->syncOp = syncOp;
D
dapan1121 已提交
1025
  op->stopQueue = stopQueue;
1026
  op->unLocked = true;
dengyihao's avatar
dengyihao 已提交
1027

D
dapan1121 已提交
1028 1029 1030
  SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
D
dapan1121 已提交
1031
    taosMemoryFree(op);
D
dapan1121 已提交
1032
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1033 1034 1035
  }

  msg->pCtg = pCtg;
D
dapan1121 已提交
1036
  msg->freeCtg = freeCtg;
D
dapan1121 已提交
1037 1038 1039
  op->data = msg;

  CTG_ERR_JRET(ctgEnqueue(pCtg, op));
dengyihao's avatar
dengyihao 已提交
1040

D
dapan1121 已提交
1041
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1042

D
dapan1121 已提交
1043 1044 1045 1046 1047
_return:

  CTG_RET(code);
}

D
dapan1121 已提交
1048 1049 1050 1051 1052 1053
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
dengyihao's avatar
dengyihao 已提交
1054

D
dapan1121 已提交
1055 1056 1057
  mgmt->slots = taosMemoryCalloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan1121 已提交
1058
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1059 1060 1061
  }

  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
dengyihao's avatar
dengyihao 已提交
1062

D
dapan1121 已提交
1063 1064 1065 1066 1067 1068 1069
  return TSDB_CODE_SUCCESS;
}

int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1070 1071
  int32_t       code = 0;

D
dapan1121 已提交
1072 1073 1074 1075
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1076 1077
      qError("taosArrayInit %d failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx,
             mgmt->type);
D
dapan1121 已提交
1078
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1079 1080 1081 1082
    }
  }

  if (NULL == taosArrayPush(slot->meta, meta)) {
dengyihao's avatar
dengyihao 已提交
1083
    qError("taosArrayPush meta to rent failed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1084
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1085 1086 1087 1088
  }

  slot->needSort = true;

dengyihao's avatar
dengyihao 已提交
1089
  qDebug("add meta to rent, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1090 1091 1092 1093 1094 1095 1096

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1097 1098
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare,
                          __compar_fn_t searchCompare) {
D
dapan1121 已提交
1099 1100 1101
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1102
  int32_t       code = 0;
D
dapan1121 已提交
1103 1104 1105

  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1106
    qDebug("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1107 1108 1109 1110
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
dengyihao's avatar
dengyihao 已提交
1111 1112
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1113 1114 1115 1116 1117 1118 1119
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
  }

  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
  if (NULL == orig) {
dengyihao's avatar
dengyihao 已提交
1120 1121
    qDebug("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type,
           (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1122 1123 1124 1125 1126
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

dengyihao's avatar
dengyihao 已提交
1127
  qDebug("meta in rent updated, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1128 1129 1130 1131 1132 1133

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
dengyihao's avatar
dengyihao 已提交
1134 1135
    qDebug("meta in rent update failed, will try to add it, code:%x, id:0x%" PRIx64 ", slot idx:%d, type:%d", code, id,
           widx, mgmt->type);
D
dapan1121 已提交
1136 1137 1138 1139 1140 1141 1142 1143 1144 1145
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
  int16_t widx = abs((int)(id % mgmt->slotNum));

  SCtgRentSlot *slot = &mgmt->slots[widx];
dengyihao's avatar
dengyihao 已提交
1146 1147
  int32_t       code = 0;

D
dapan1121 已提交
1148 1149
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
dengyihao's avatar
dengyihao 已提交
1150
    qError("empty meta slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, sortCompare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
  if (idx < 0) {
dengyihao's avatar
dengyihao 已提交
1162
    qError("meta not found in slot, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1163 1164 1165 1166 1167
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

dengyihao's avatar
dengyihao 已提交
1168
  qDebug("meta in rent removed, id:0x%" PRIx64 ", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
  }

  SCtgRentSlot *slot = &mgmt->slots[ridx];
dengyihao's avatar
dengyihao 已提交
1185 1186
  int32_t       code = 0;

D
dapan1121 已提交
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = taosMemoryMalloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
D
dapan1121 已提交
1205
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
  int32_t code = 0;

  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

dengyihao's avatar
dengyihao 已提交
1252 1253
  newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
                                    true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1254
  if (NULL == newDBCache.tbCache) {
D
dapan1121 已提交
1255
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1256
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1257 1258
  }

dengyihao's avatar
dengyihao 已提交
1259 1260
  newDBCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT),
                                     true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1261
  if (NULL == newDBCache.stbCache) {
D
dapan1121 已提交
1262
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1263
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1264 1265 1266 1267 1268 1269 1270 1271
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
      goto _return;
    }
dengyihao's avatar
dengyihao 已提交
1272

D
dapan1121 已提交
1273
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1274
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1275 1276
  }

D
dapan1121 已提交
1277
  CTG_CACHE_STAT_INC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1278

D
dapan1121 已提交
1279
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1, .stateTs = 0};
D
dapan1121 已提交
1280
  tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1281

dengyihao's avatar
dengyihao 已提交
1282
  ctgDebug("db added to cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1283

D
dapan1121 已提交
1284 1285
  if (!IS_SYS_DBNAME(dbFName)) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1286

D
dapan1121 已提交
1287 1288
    ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:0x%" PRIx64, dbFName, vgVersion.vgVersion, dbId);
  }
D
dapan1121 已提交
1289 1290 1291 1292 1293 1294 1295 1296 1297 1298

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeDbCache(&newDBCache);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1299
void ctgRemoveStbRent(SCatalog *pCtg, SCtgDBCache *dbCache) {
D
dapan1121 已提交
1300 1301 1302
  if (NULL == dbCache->stbCache) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
1303

D
dapan1121 已提交
1304 1305 1306 1307
  void *pIter = taosHashIterate(dbCache->stbCache, NULL);
  while (pIter) {
    uint64_t *suid = NULL;
    suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1308

dengyihao's avatar
dengyihao 已提交
1309 1310 1311
    if (TSDB_CODE_SUCCESS ==
        ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
      ctgDebug("stb removed from rent, suid:0x%" PRIx64, *suid);
D
dapan1121 已提交
1312
    }
dengyihao's avatar
dengyihao 已提交
1313

D
dapan1121 已提交
1314
    pIter = taosHashIterate(dbCache->stbCache, pIter);
D
dapan1121 已提交
1315 1316 1317
  }
}

dengyihao's avatar
dengyihao 已提交
1318
int32_t ctgRemoveDBFromCache(SCatalog *pCtg, SCtgDBCache *dbCache, const char *dbFName) {
D
dapan1121 已提交
1319
  uint64_t dbId = dbCache->dbId;
dengyihao's avatar
dengyihao 已提交
1320 1321

  ctgInfo("start to remove db from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1322

D
dapan1121 已提交
1323
  CTG_LOCK(CTG_WRITE, &dbCache->dbLock);
D
dapan1121 已提交
1324

D
dapan1121 已提交
1325
  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1326
  ctgRemoveStbRent(pCtg, dbCache);
D
dapan1121 已提交
1327 1328
  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1329 1330 1331
  CTG_UNLOCK(CTG_WRITE, &dbCache->dbLock);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1332
  ctgDebug("db removed from rent, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);
D
dapan1121 已提交
1333 1334 1335 1336 1337 1338

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

D
dapan1121 已提交
1339
  CTG_CACHE_STAT_DEC(numOfDb, 1);
dengyihao's avatar
dengyihao 已提交
1340 1341
  ctgInfo("db removed from cache, dbFName:%s, dbId:0x%" PRIx64, dbFName, dbId);

D
dapan1121 已提交
1342 1343 1344
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1345 1346
int32_t ctgGetAddDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t      code = 0;
D
dapan1121 已提交
1347 1348
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
1349

D
dapan1121 已提交
1350
  if (dbCache) {
dengyihao's avatar
dengyihao 已提交
1351
    // TODO OPEN IT
D
dapan1121 已提交
1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367
#if 0    
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#else
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }

    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1368

D
dapan1121 已提交
1369 1370 1371 1372 1373 1374 1375
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDBFromCache(pCtg, dbCache, dbFName));
  }
dengyihao's avatar
dengyihao 已提交
1376

D
dapan1121 已提交
1377 1378 1379 1380 1381 1382 1383 1384 1385
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));

  ctgGetDBCache(pCtg, dbFName, &dbCache);

  *pCache = dbCache;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1386 1387
int32_t ctgUpdateRentStbVersion(SCatalog *pCtg, char *dbFName, char *tbName, uint64_t dbId, uint64_t suid,
                                SCtgTbCache *pCache) {
D
dapan1121 已提交
1388 1389 1390 1391 1392 1393 1394 1395 1396
  SSTableVersion metaRent = {.dbId = dbId, .suid = suid};
  if (pCache->pMeta) {
    metaRent.sversion = pCache->pMeta->sversion;
    metaRent.tversion = pCache->pMeta->tversion;
  }

  if (pCache->pIndex) {
    metaRent.smaVer = pCache->pIndex->version;
  }
dengyihao's avatar
dengyihao 已提交
1397

D
dapan1121 已提交
1398 1399
  tstrncpy(metaRent.dbFName, dbFName, sizeof(metaRent.dbFName));
  tstrncpy(metaRent.stbName, tbName, sizeof(metaRent.stbName));
D
dapan1121 已提交
1400

dengyihao's avatar
dengyihao 已提交
1401 1402
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableVersion),
                                ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1403

dengyihao's avatar
dengyihao 已提交
1404 1405
  ctgDebug("db %s,0x%" PRIx64 " stb %s,0x%" PRIx64 " sver %d tver %d smaVer %d updated to stbRent", dbFName, dbId,
           tbName, suid, metaRent.sversion, metaRent.tversion, metaRent.smaVer);
D
dapan1121 已提交
1406

dengyihao's avatar
dengyihao 已提交
1407 1408
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1409

dengyihao's avatar
dengyihao 已提交
1410 1411
int32_t ctgWriteTbMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName,
                              STableMeta *meta, int32_t metaSize) {
D
dapan1121 已提交
1412 1413
  if (NULL == dbCache->tbCache || NULL == dbCache->stbCache) {
    taosMemoryFree(meta);
dengyihao's avatar
dengyihao 已提交
1414
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1415 1416 1417
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1418 1419 1420 1421 1422
  bool         isStb = meta->tableType == TSDB_SUPER_TABLE;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  STableMeta  *orig = (pCache ? pCache->pMeta : NULL);
  int8_t       origType = 0;

D
dapan1121 已提交
1423 1424 1425
  if (orig) {
    origType = orig->tableType;

dengyihao's avatar
dengyihao 已提交
1426 1427
    if (origType == meta->tableType && orig->uid == meta->uid &&
        (origType == TSDB_CHILD_TABLE || (orig->sversion >= meta->sversion && orig->tversion >= meta->tversion))) {
D
dapan1121 已提交
1428 1429
      taosMemoryFree(meta);
      ctgDebug("ignore table %s meta update", tbName);
D
dapan1121 已提交
1430 1431
      return TSDB_CODE_SUCCESS;
    }
dengyihao's avatar
dengyihao 已提交
1432

D
dapan1121 已提交
1433
    if (origType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1434
      if (taosHashRemove(dbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
dengyihao's avatar
dengyihao 已提交
1435
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1436
      } else {
D
dapan1121 已提交
1437
        CTG_CACHE_STAT_DEC(numOfStb, 1);
dengyihao's avatar
dengyihao 已提交
1438
        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:0x%" PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1439 1440 1441 1442
      }
    }
  }

D
dapan1121 已提交
1443 1444 1445 1446 1447 1448
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pMeta = meta;
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(SCtgTbCache)) != 0) {
      taosMemoryFree(meta);
      ctgError("taosHashPut new tbCache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1449
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1450
    }
dengyihao's avatar
dengyihao 已提交
1451

D
dapan1121 已提交
1452 1453
    pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
  } else {
1454
    CTG_LOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1455 1456
    taosMemoryFree(pCache->pMeta);
    pCache->pMeta = meta;
1457
    CTG_UNLOCK(CTG_WRITE, &pCache->metaLock);
D
dapan1121 已提交
1458 1459 1460
  }

  if (NULL == orig) {
D
dapan1121 已提交
1461
    CTG_CACHE_STAT_INC(numOfTbl, 1);
D
dapan1121 已提交
1462 1463 1464 1465 1466 1467 1468 1469 1470
  }

  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
  ctgdShowTableMeta(pCtg, tbName, meta);

  if (!isStb) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1471
  if (taosHashPut(dbCache->stbCache, &meta->suid, sizeof(meta->suid), tbName, strlen(tbName) + 1) != 0) {
dengyihao's avatar
dengyihao 已提交
1472
    ctgError("taosHashPut to stable cache failed, suid:0x%" PRIx64, meta->suid);
D
dapan1121 已提交
1473
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1474 1475
  }

D
dapan1121 已提交
1476
  CTG_CACHE_STAT_INC(numOfStb, 1);
D
dapan1121 已提交
1477

dengyihao's avatar
dengyihao 已提交
1478 1479
  ctgDebug("stb 0x%" PRIx64 " updated to cache, dbFName:%s, tbName:%s, tbType:%d", meta->suid, dbFName, tbName,
           meta->tableType);
D
dapan1121 已提交
1480

D
dapan1121 已提交
1481 1482 1483
  if (pCache) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbId, meta->suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1484

D
dapan1121 已提交
1485 1486 1487
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1488
int32_t ctgWriteTbIndexToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, STableIndex **index) {
D
dapan1121 已提交
1489
  if (NULL == dbCache->tbCache) {
D
dapan1121 已提交
1490
    ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1491
    taosMemoryFreeClear(*index);
dengyihao's avatar
dengyihao 已提交
1492
    ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
D
dapan1121 已提交
1493 1494 1495
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

dengyihao's avatar
dengyihao 已提交
1496 1497 1498
  STableIndex *pIndex = *index;
  uint64_t     suid = pIndex->suid;
  SCtgTbCache *pCache = taosHashGet(dbCache->tbCache, tbName, strlen(tbName));
D
dapan1121 已提交
1499 1500 1501
  if (NULL == pCache) {
    SCtgTbCache cache = {0};
    cache.pIndex = pIndex;
dengyihao's avatar
dengyihao 已提交
1502

D
dapan1121 已提交
1503 1504
    if (taosHashPut(dbCache->tbCache, tbName, strlen(tbName), &cache, sizeof(cache)) != 0) {
      ctgFreeSTableIndex(*index);
D
dapan1121 已提交
1505 1506
      taosMemoryFreeClear(*index);
      ctgError("taosHashPut new tbCache failed, tbName:%s", tbName);
D
dapan1121 已提交
1507
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1508 1509 1510
    }

    *index = NULL;
dengyihao's avatar
dengyihao 已提交
1511 1512
    ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
             (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1513

D
dapan1121 已提交
1514
    if (suid) {
D
dapan1121 已提交
1515
      CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, pIndex->suid, &cache));
D
dapan1121 已提交
1516
    }
dengyihao's avatar
dengyihao 已提交
1517

D
dapan1121 已提交
1518 1519 1520
    return TSDB_CODE_SUCCESS;
  }

1521 1522
  CTG_LOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1523
  if (pCache->pIndex) {
D
dapan1121 已提交
1524 1525 1526
    if (0 == suid) {
      suid = pCache->pIndex->suid;
    }
D
dapan1121 已提交
1527 1528 1529 1530 1531
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }

  pCache->pIndex = pIndex;
1532 1533
  CTG_UNLOCK(CTG_WRITE, &pCache->indexLock);

D
dapan1121 已提交
1534 1535
  *index = NULL;

dengyihao's avatar
dengyihao 已提交
1536 1537
  ctgDebug("table %s index updated to cache, ver:%d, num:%d", tbName, pIndex->version,
           (int32_t)taosArrayGetSize(pIndex->pIndex));
D
dapan1121 已提交
1538

D
dapan1121 已提交
1539 1540 1541
  if (suid) {
    CTG_ERR_RET(ctgUpdateRentStbVersion(pCtg, dbFName, tbName, dbCache->dbId, suid, pCache));
  }
dengyihao's avatar
dengyihao 已提交
1542

D
dapan1121 已提交
1543 1544 1545
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1546 1547 1548 1549
int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool syncReq) {
  STableMetaOutput *pOutput = NULL;
  int32_t           code = 0;

D
dapan1121 已提交
1550
  CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
D
dapan1121 已提交
1551 1552 1553
  code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
  pOutput = NULL;
  CTG_ERR_JRET(code);
D
dapan1121 已提交
1554 1555

  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1556

D
dapan1121 已提交
1557 1558 1559 1560 1561 1562
_return:

  ctgFreeSTableMetaOutput(pOutput);
  CTG_RET(code);
}

D
dapan1121 已提交
1563
void ctgClearAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1564
  SCatalog *pCtg = NULL;
1565

dengyihao's avatar
dengyihao 已提交
1566
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
1567
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1568
    pCtg = *(SCatalog **)pIter;
1569 1570

    if (pCtg) {
D
dapan1121 已提交
1571 1572 1573 1574 1575 1576 1577 1578
      ctgClearHandle(pCtg);
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }
}

void ctgFreeAllInstance(void) {
dengyihao's avatar
dengyihao 已提交
1579
  SCatalog *pCtg = NULL;
D
dapan1121 已提交
1580

dengyihao's avatar
dengyihao 已提交
1581
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1582
  while (pIter) {
dengyihao's avatar
dengyihao 已提交
1583
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1584 1585 1586

    if (pCtg) {
      ctgFreeHandle(pCtg);
1587 1588 1589 1590 1591 1592 1593 1594
    }

    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
  }

  taosHashClear(gCtgMgmt.pCluster);
}

D
dapan1121 已提交
1595
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1596
  int32_t          code = 0;
D
dapan1121 已提交
1597
  SCtgUpdateVgMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1598 1599 1600 1601
  SDBVgInfo       *dbInfo = msg->dbInfo;
  char            *dbFName = msg->dbFName;
  SCatalog        *pCtg = msg->pCtg;

D
dapan1121 已提交
1602
  if (pCtg->stopUpdate || NULL == dbInfo->vgHash) {
D
dapan1121 已提交
1603
    goto _return;
D
dapan1121 已提交
1604
  }
dengyihao's avatar
dengyihao 已提交
1605

D
dapan1121 已提交
1606
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
dengyihao's avatar
dengyihao 已提交
1607 1608
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", dbFName, dbInfo->vgHash,
             dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1609
    CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
D
dapan1121 已提交
1610 1611
  }

dengyihao's avatar
dengyihao 已提交
1612
  bool         newAdded = false;
dengyihao's avatar
dengyihao 已提交
1613 1614
  SDbVgVersion vgVersion = {
      .dbId = msg->dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable, .stateTs = dbInfo->stateTs};
D
dapan1121 已提交
1615 1616

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1617
  CTG_ERR_JRET(ctgGetAddDBCache(msg->pCtg, dbFName, msg->dbId, &dbCache));
D
dapan1121 已提交
1618
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1619
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, dbFName, msg->dbId);
D
dapan1121 已提交
1620
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1621 1622 1623
  }

  SCtgVgCache *vgCache = &dbCache->vgCache;
D
dapan1121 已提交
1624
  CTG_ERR_JRET(ctgWLockVgInfo(msg->pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1625

D
dapan1121 已提交
1626 1627
  if (vgCache->vgInfo) {
    SDBVgInfo *vgInfo = vgCache->vgInfo;
dengyihao's avatar
dengyihao 已提交
1628

D
dapan1121 已提交
1629
    if (dbInfo->vgVersion < vgInfo->vgVersion) {
dengyihao's avatar
dengyihao 已提交
1630 1631
      ctgDebug("db updateVgroup is ignored, dbFName:%s, vgVer:%d, curVer:%d", dbFName, dbInfo->vgVersion,
               vgInfo->vgVersion);
D
dapan1121 已提交
1632
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1633

D
dapan1121 已提交
1634
      goto _return;
D
dapan1121 已提交
1635 1636
    }

dengyihao's avatar
dengyihao 已提交
1637 1638 1639 1640
    if (dbInfo->vgVersion == vgInfo->vgVersion && dbInfo->numOfTable == vgInfo->numOfTable &&
        dbInfo->stateTs == vgInfo->stateTs) {
      ctgDebug("no new db vgroup update info, dbFName:%s, vgVer:%d, numOfTable:%d, stateTs:%" PRId64, dbFName,
               dbInfo->vgVersion, dbInfo->numOfTable, dbInfo->stateTs);
D
dapan1121 已提交
1641
      ctgWUnlockVgInfo(dbCache);
dengyihao's avatar
dengyihao 已提交
1642

D
dapan1121 已提交
1643
      goto _return;
D
dapan1121 已提交
1644 1645 1646 1647 1648 1649 1650 1651
    }

    ctgFreeVgInfo(vgInfo);
  }

  vgCache->vgInfo = dbInfo;
  msg->dbInfo = NULL;

dengyihao's avatar
dengyihao 已提交
1652 1653
  ctgDebug("db vgInfo updated, dbFName:%s, vgVer:%d, stateTs:%" PRId64 ", dbId:0x%" PRIx64, dbFName,
           vgVersion.vgVersion, vgVersion.stateTs, vgVersion.dbId);
D
dapan1121 已提交
1654 1655 1656 1657 1658

  ctgWUnlockVgInfo(dbCache);

  dbCache = NULL;

1659
  //if (!IS_SYS_DBNAME(dbFName)) {
D
dapan1121 已提交
1660 1661 1662
    tstrncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
    CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion),
                                   ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
1663
  //}
D
dapan1121 已提交
1664 1665 1666 1667 1668

_return:

  ctgFreeVgInfo(msg->dbInfo);
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1669

D
dapan1121 已提交
1670 1671 1672
  CTG_RET(code);
}

D
dapan1121 已提交
1673
int32_t ctgOpDropDbCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1674
  int32_t        code = 0;
D
dapan1121 已提交
1675
  SCtgDropDBMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1676
  SCatalog      *pCtg = msg->pCtg;
D
dapan1121 已提交
1677

D
dapan1121 已提交
1678 1679 1680 1681
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1682 1683 1684 1685 1686
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1687

1688
  if (msg->dbId && dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1689 1690
    ctgInfo("dbId already updated, dbFName:%s, dbId:0x%" PRIx64 ", targetId:0x%" PRIx64, msg->dbFName, dbCache->dbId,
            msg->dbId);
D
dapan1121 已提交
1691 1692
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1693

D
dapan1121 已提交
1694 1695 1696 1697 1698
  CTG_ERR_JRET(ctgRemoveDBFromCache(pCtg, dbCache, msg->dbFName));

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1699

D
dapan1121 已提交
1700 1701 1702
  CTG_RET(code);
}

D
dapan1121 已提交
1703
int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1704
  int32_t              code = 0;
D
dapan1121 已提交
1705
  SCtgDropDbVgroupMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1706
  SCatalog            *pCtg = msg->pCtg;
D
dapan1121 已提交
1707

D
dapan1121 已提交
1708 1709 1710 1711
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1712 1713 1714 1715 1716
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1717

dengyihao's avatar
dengyihao 已提交
1718
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
dengyihao's avatar
dengyihao 已提交
1719

D
dapan1121 已提交
1720 1721
  ctgFreeVgInfo(dbCache->vgCache.vgInfo);
  dbCache->vgCache.vgInfo = NULL;
D
dapan1121 已提交
1722 1723 1724

  ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);

D
dapan1121 已提交
1725
  ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1726 1727 1728 1729

_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1730

D
dapan1121 已提交
1731 1732 1733
  CTG_RET(code);
}

D
dapan1121 已提交
1734
int32_t ctgOpUpdateTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1735
  int32_t              code = 0;
D
dapan1121 已提交
1736
  SCtgUpdateTbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1737 1738 1739
  SCatalog            *pCtg = msg->pCtg;
  STableMetaOutput    *pMeta = msg->pMeta;
  SCtgDBCache         *dbCache = NULL;
D
dapan1121 已提交
1740

D
dapan1121 已提交
1741 1742 1743
  if (pCtg->stopUpdate) {
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1744

D
dapan1121 已提交
1745 1746
  if ((!CTG_IS_META_CTABLE(pMeta->metaType)) && NULL == pMeta->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", pMeta->dbFName, pMeta->tbName);
D
dapan1121 已提交
1747 1748 1749
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1750 1751
  if (CTG_IS_META_BOTH(pMeta->metaType) && TSDB_SUPER_TABLE != pMeta->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, pMeta->tbMeta->tableType);
D
dapan1121 已提交
1752
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
dengyihao's avatar
dengyihao 已提交
1753 1754
  }

D
dapan1121 已提交
1755
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pMeta->dbFName, pMeta->dbId, &dbCache));
D
dapan1121 已提交
1756
  if (NULL == dbCache) {
D
dapan1121 已提交
1757
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:0x%" PRIx64, pMeta->dbFName, pMeta->dbId);
D
dapan1121 已提交
1758 1759 1760
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

D
dapan1121 已提交
1761 1762
  if (CTG_IS_META_TABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
    int32_t metaSize = CTG_META_SIZE(pMeta->tbMeta);
D
dapan1121 已提交
1763
    code = ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->tbName, pMeta->tbMeta, metaSize);
D
dapan1121 已提交
1764
    pMeta->tbMeta = NULL;
D
dapan1121 已提交
1765
    CTG_ERR_JRET(code);
D
dapan1121 已提交
1766 1767
  }

D
dapan1121 已提交
1768
  if (CTG_IS_META_CTABLE(pMeta->metaType) || CTG_IS_META_BOTH(pMeta->metaType)) {
dengyihao's avatar
dengyihao 已提交
1769
    SCTableMeta *ctbMeta = taosMemoryMalloc(sizeof(SCTableMeta));
D
dapan1121 已提交
1770 1771 1772 1773
    if (NULL == ctbMeta) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    memcpy(ctbMeta, &pMeta->ctbMeta, sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
1774 1775
    CTG_ERR_JRET(ctgWriteTbMetaToCache(pCtg, dbCache, pMeta->dbFName, pMeta->dbId, pMeta->ctbName,
                                       (STableMeta *)ctbMeta, sizeof(SCTableMeta)));
D
dapan1121 已提交
1776 1777 1778 1779
  }

_return:

D
dapan1121 已提交
1780 1781
  taosMemoryFreeClear(pMeta->tbMeta);
  taosMemoryFreeClear(pMeta);
dengyihao's avatar
dengyihao 已提交
1782

D
dapan1121 已提交
1783
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1784

D
dapan1121 已提交
1785 1786 1787
  CTG_RET(code);
}

D
dapan1121 已提交
1788
int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1789
  int32_t             code = 0;
D
dapan1121 已提交
1790
  SCtgDropStbMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1791
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1792

D
dapan1121 已提交
1793 1794 1795 1796
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1797 1798 1799
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
dengyihao's avatar
dengyihao 已提交
1800
    goto _return;
D
dapan1121 已提交
1801 1802 1803
  }

  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
dengyihao's avatar
dengyihao 已提交
1804
    ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
D
dapan1121 已提交
1805
             msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
dengyihao's avatar
dengyihao 已提交
1806
    goto _return;
D
dapan1121 已提交
1807
  }
dengyihao's avatar
dengyihao 已提交
1808

D
dapan1121 已提交
1809
  if (taosHashRemove(dbCache->stbCache, &msg->suid, sizeof(msg->suid))) {
dengyihao's avatar
dengyihao 已提交
1810 1811
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName,
             msg->stbName, msg->suid);
D
dapan1121 已提交
1812
  } else {
D
dapan1121 已提交
1813
    CTG_CACHE_STAT_DEC(numOfStb, 1);
D
dapan1121 已提交
1814 1815
  }

dengyihao's avatar
dengyihao 已提交
1816
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->stbName, strlen(msg->stbName));
D
dapan1121 已提交
1817 1818 1819 1820 1821 1822 1823 1824 1825
  if (NULL == pTbCache) {
    ctgDebug("stb %s already not in cache", msg->stbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);

dengyihao's avatar
dengyihao 已提交
1826 1827
  if (taosHashRemove(dbCache->tbCache, msg->stbName, strlen(msg->stbName))) {
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1828
  } else {
D
dapan1121 已提交
1829
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1830
  }
dengyihao's avatar
dengyihao 已提交
1831 1832

  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1833 1834

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
dengyihao's avatar
dengyihao 已提交
1835 1836 1837

  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:0x%" PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1838 1839 1840
_return:

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1841

D
dapan1121 已提交
1842 1843 1844
  CTG_RET(code);
}

D
dapan1121 已提交
1845
int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1846
  int32_t             code = 0;
D
dapan1121 已提交
1847
  SCtgDropTblMetaMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1848
  SCatalog           *pCtg = msg->pCtg;
D
dapan1121 已提交
1849

D
dapan1121 已提交
1850 1851 1852 1853
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1854 1855 1856
  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1857
    goto _return;
D
dapan1121 已提交
1858 1859 1860
  }

  if (dbCache->dbId != msg->dbId) {
dengyihao's avatar
dengyihao 已提交
1861 1862
    ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
             msg->dbFName, msg->tbName);
D
dapan1121 已提交
1863
    goto _return;
D
dapan1121 已提交
1864 1865
  }

dengyihao's avatar
dengyihao 已提交
1866
  SCtgTbCache *pTbCache = taosHashGet(dbCache->tbCache, msg->tbName, strlen(msg->tbName));
D
dapan1121 已提交
1867 1868 1869 1870 1871 1872 1873 1874
  if (NULL == pTbCache) {
    ctgDebug("tb %s already not in cache", msg->tbName);
    goto _return;
  }

  CTG_LOCK(CTG_WRITE, &pTbCache->metaLock);
  ctgFreeTbCacheImpl(pTbCache);
  CTG_UNLOCK(CTG_WRITE, &pTbCache->metaLock);
dengyihao's avatar
dengyihao 已提交
1875

D
dapan1121 已提交
1876 1877 1878
  if (taosHashRemove(dbCache->tbCache, msg->tbName, strlen(msg->tbName))) {
    ctgError("tb %s not exist in cache, dbFName:%s", msg->tbName, msg->dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1879
  } else {
D
dapan1121 已提交
1880
    CTG_CACHE_STAT_DEC(numOfTbl, 1);
D
dapan1121 已提交
1881 1882
  }

D
dapan1121 已提交
1883
  ctgDebug("table %s removed from cache, dbFName:%s", msg->tbName, msg->dbFName);
D
dapan1121 已提交
1884 1885 1886 1887 1888 1889 1890 1891

_return:

  taosMemoryFreeClear(msg);

  CTG_RET(code);
}

D
dapan1121 已提交
1892
int32_t ctgOpUpdateUser(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1893
  int32_t            code = 0;
D
dapan1121 已提交
1894
  SCtgUpdateUserMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1895 1896
  SCatalog          *pCtg = msg->pCtg;

D
dapan1121 已提交
1897 1898 1899 1900
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943
  SCtgUserAuth *pUser = (SCtgUserAuth *)taosHashGet(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user));
  if (NULL == pUser) {
    SCtgUserAuth userAuth = {0};

    userAuth.version = msg->userAuth.version;
    userAuth.superUser = msg->userAuth.superAuth;
    userAuth.createdDbs = msg->userAuth.createdDbs;
    userAuth.readDbs = msg->userAuth.readDbs;
    userAuth.writeDbs = msg->userAuth.writeDbs;

    if (taosHashPut(pCtg->userCache, msg->userAuth.user, strlen(msg->userAuth.user), &userAuth, sizeof(userAuth))) {
      ctgError("taosHashPut user %s to cache failed", msg->userAuth.user);
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

    taosMemoryFreeClear(msg);

    return TSDB_CODE_SUCCESS;
  }

  pUser->version = msg->userAuth.version;

  CTG_LOCK(CTG_WRITE, &pUser->lock);

  taosHashCleanup(pUser->createdDbs);
  pUser->createdDbs = msg->userAuth.createdDbs;
  msg->userAuth.createdDbs = NULL;

  taosHashCleanup(pUser->readDbs);
  pUser->readDbs = msg->userAuth.readDbs;
  msg->userAuth.readDbs = NULL;

  taosHashCleanup(pUser->writeDbs);
  pUser->writeDbs = msg->userAuth.writeDbs;
  msg->userAuth.writeDbs = NULL;

  CTG_UNLOCK(CTG_WRITE, &pUser->lock);

_return:

  taosHashCleanup(msg->userAuth.createdDbs);
  taosHashCleanup(msg->userAuth.readDbs);
  taosHashCleanup(msg->userAuth.writeDbs);
dengyihao's avatar
dengyihao 已提交
1944

D
dapan1121 已提交
1945
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1946

D
dapan1121 已提交
1947 1948 1949
  CTG_RET(code);
}

D
dapan1121 已提交
1950
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
1951
  int32_t             code = 0;
D
dapan1121 已提交
1952
  SCtgUpdateEpsetMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
1953
  SCatalog           *pCtg = msg->pCtg;
dengyihao's avatar
dengyihao 已提交
1954
  SCtgDBCache        *dbCache = NULL;
D
dapan1121 已提交
1955 1956 1957 1958 1959

  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
1960
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
1961 1962 1963 1964 1965
  if (NULL == dbCache) {
    ctgDebug("db %s not exist, ignore epset update", msg->dbFName);
    goto _return;
  }

D
dapan1121 已提交
1966 1967
  CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));

dengyihao's avatar
dengyihao 已提交
1968
  SDBVgInfo *vgInfo = dbCache->vgCache.vgInfo;
D
dapan1121 已提交
1969
  if (NULL == vgInfo) {
D
dapan1121 已提交
1970 1971 1972
    ctgDebug("vgroup in db %s not cached, ignore epset update", msg->dbFName);
    goto _return;
  }
dengyihao's avatar
dengyihao 已提交
1973 1974

  SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
D
dapan1121 已提交
1975 1976 1977 1978 1979
  if (NULL == pInfo) {
    ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
1980 1981 1982 1983 1984
  SEp *pOrigEp = &pInfo->epSet.eps[pInfo->epSet.inUse];
  SEp *pNewEp = &msg->epSet.eps[msg->epSet.inUse];
  ctgDebug("vgroup %d epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d, dbFName:%s in ctg", pInfo->vgId,
           pInfo->epSet.inUse, pInfo->epSet.numOfEps, pOrigEp->fqdn, pOrigEp->port, msg->epSet.inUse,
           msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
D
dapan1121 已提交
1985

D
dapan1121 已提交
1986
  pInfo->epSet = msg->epSet;
D
dapan1121 已提交
1987 1988 1989 1990

_return:

  if (dbCache) {
D
dapan1121 已提交
1991
    ctgWUnlockVgInfo(dbCache);
D
dapan1121 已提交
1992 1993 1994
  }

  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
1995

D
dapan1121 已提交
1996 1997 1998
  CTG_RET(code);
}

D
dapan1121 已提交
1999
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2000
  int32_t               code = 0;
D
dapan1121 已提交
2001
  SCtgUpdateTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2002 2003 2004 2005
  SCatalog             *pCtg = msg->pCtg;
  STableIndex          *pIndex = msg->pIndex;
  SCtgDBCache          *dbCache = NULL;

D
dapan1121 已提交
2006 2007 2008 2009
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2010
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pIndex->dbFName, 0, &dbCache));
D
dapan1121 已提交
2011 2012 2013 2014 2015 2016 2017 2018 2019

  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2020

D
dapan1121 已提交
2021
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2022

D
dapan1121 已提交
2023 2024 2025 2026
  CTG_RET(code);
}

int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2027
  int32_t             code = 0;
D
dapan1121 已提交
2028
  SCtgDropTbIndexMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2029 2030 2031
  SCatalog           *pCtg = msg->pCtg;
  SCtgDBCache        *dbCache = NULL;

D
dapan1121 已提交
2032 2033 2034 2035
  if (pCtg->stopUpdate) {
    goto _return;
  }

D
dapan1121 已提交
2036
  CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
D
dapan1121 已提交
2037
  if (NULL == dbCache) {
D
dapan1121 已提交
2038 2039 2040
    return TSDB_CODE_SUCCESS;
  }

dengyihao's avatar
dengyihao 已提交
2041
  STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
D
dapan1121 已提交
2042 2043
  if (NULL == pIndex) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
2044
  }
D
dapan1121 已提交
2045 2046 2047
  strcpy(pIndex->tbName, msg->tbName);
  strcpy(pIndex->dbFName, msg->dbFName);
  pIndex->version = -1;
D
dapan1121 已提交
2048

D
dapan1121 已提交
2049
  CTG_ERR_JRET(ctgWriteTbIndexToCache(pCtg, dbCache, pIndex->dbFName, pIndex->tbName, &pIndex));
D
dapan1121 已提交
2050 2051 2052 2053 2054 2055 2056

_return:

  if (pIndex) {
    taosArrayDestroyEx(pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pIndex);
  }
dengyihao's avatar
dengyihao 已提交
2057

D
dapan1121 已提交
2058
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2059

D
dapan1121 已提交
2060 2061 2062
  CTG_RET(code);
}

D
dapan1121 已提交
2063
int32_t ctgOpClearCache(SCtgCacheOperation *operation) {
dengyihao's avatar
dengyihao 已提交
2064
  int32_t            code = 0;
D
dapan1121 已提交
2065
  SCtgClearCacheMsg *msg = operation->data;
dengyihao's avatar
dengyihao 已提交
2066
  SCatalog          *pCtg = msg->pCtg;
D
dapan1121 已提交
2067

D
dapan1121 已提交
2068 2069
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);

D
dapan1121 已提交
2070
  if (pCtg) {
D
dapan1121 已提交
2071 2072 2073 2074 2075
    if (msg->freeCtg) {
      ctgFreeHandle(pCtg);
    } else {
      ctgClearHandle(pCtg);
    }
dengyihao's avatar
dengyihao 已提交
2076

D
dapan1121 已提交
2077 2078
    goto _return;
  }
D
dapan1121 已提交
2079 2080 2081 2082 2083 2084

  if (msg->freeCtg) {
    ctgFreeAllInstance();
  } else {
    ctgClearAllInstance();
  }
D
dapan1121 已提交
2085 2086

_return:
D
dapan1121 已提交
2087 2088

  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
dengyihao's avatar
dengyihao 已提交
2089

D
dapan1121 已提交
2090
  taosMemoryFreeClear(msg);
dengyihao's avatar
dengyihao 已提交
2091

D
dapan1121 已提交
2092 2093 2094
  CTG_RET(code);
}

2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
  if (NULL == op || NULL == op->data) {
    return;
  }

  switch (op->opId) {
    case CTG_OP_UPDATE_VGROUP: {
      SCtgUpdateVgMsg *msg = op->data;
      ctgFreeVgInfo(msg->dbInfo);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_META: {
      SCtgUpdateTbMetaMsg *msg = op->data;
      taosMemoryFreeClear(msg->pMeta->tbMeta);
      taosMemoryFreeClear(msg->pMeta);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_DROP_DB_CACHE:
    case CTG_OP_DROP_DB_VGROUP:
    case CTG_OP_DROP_STB_META:
    case CTG_OP_DROP_TB_META:
    case CTG_OP_UPDATE_VG_EPSET:
    case CTG_OP_DROP_TB_INDEX:
    case CTG_OP_CLEAR_CACHE: {
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_USER: {
      SCtgUpdateUserMsg *msg = op->data;
      taosHashCleanup(msg->userAuth.createdDbs);
      taosHashCleanup(msg->userAuth.readDbs);
      taosHashCleanup(msg->userAuth.writeDbs);
      taosMemoryFreeClear(op->data);
      break;
    }
    case CTG_OP_UPDATE_TB_INDEX: {
      SCtgUpdateTbIndexMsg *msg = op->data;
      if (msg->pIndex) {
        taosArrayDestroyEx(msg->pIndex->pIndex, tFreeSTableIndexInfo);
        taosMemoryFreeClear(msg->pIndex);
      }
      taosMemoryFreeClear(op->data);
      break;
    }
    default: {
      qError("invalid cache op id:%d", op->opId);
      break;
    }
  }
}

D
dapan1121 已提交
2148
void ctgCleanupCacheQueue(void) {
dengyihao's avatar
dengyihao 已提交
2149 2150
  SCtgQNode          *node = NULL;
  SCtgQNode          *nodeNext = NULL;
D
dapan1121 已提交
2151
  SCtgCacheOperation *op = NULL;
dengyihao's avatar
dengyihao 已提交
2152
  bool                stopQueue = false;
D
dapan1121 已提交
2153 2154 2155 2156 2157

  while (true) {
    node = gCtgMgmt.queue.head->next;
    while (node) {
      if (node->op) {
D
dapan1121 已提交
2158 2159 2160 2161 2162 2163
        op = node->op;
        if (op->stopQueue) {
          SCatalog *pCtg = ((SCtgUpdateMsgHeader *)op->data)->pCtg;
          ctgDebug("process [%s] operation", gCtgCacheOperation[op->opId].name);
          (*gCtgCacheOperation[op->opId].func)(op);
          stopQueue = true;
dengyihao's avatar
dengyihao 已提交
2164
          CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2165
        } else {
2166
          ctgFreeCacheOperationData(op);
dengyihao's avatar
dengyihao 已提交
2167
          CTG_RT_STAT_INC(numOfOpAbort, 1);
D
dapan1121 已提交
2168
        }
dengyihao's avatar
dengyihao 已提交
2169

D
dapan1121 已提交
2170 2171
        if (op->syncOp) {
          tsem_post(&op->rspSem);
D
dapan1121 已提交
2172
        } else {
D
dapan1121 已提交
2173
          taosMemoryFree(op);
D
dapan1121 已提交
2174
        }
D
dapan1121 已提交
2175
      }
D
dapan1121 已提交
2176 2177 2178

      nodeNext = node->next;
      taosMemoryFree(node);
dengyihao's avatar
dengyihao 已提交
2179

D
dapan1121 已提交
2180
      node = nodeNext;
D
dapan1121 已提交
2181 2182
    }

D
dapan1121 已提交
2183
    if (!stopQueue) {
D
dapan1121 已提交
2184 2185 2186 2187
      taosUsleep(1);
    } else {
      break;
    }
D
dapan1121 已提交
2188 2189 2190 2191 2192 2193
  }

  taosMemoryFreeClear(gCtgMgmt.queue.head);
  gCtgMgmt.queue.tail = NULL;
}

dengyihao's avatar
dengyihao 已提交
2194
void *ctgUpdateThreadFunc(void *param) {
D
dapan1121 已提交
2195
  setThreadName("catalog");
2196

D
dapan1121 已提交
2197 2198 2199 2200 2201 2202
  qInfo("catalog update thread started");

  while (true) {
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
dengyihao's avatar
dengyihao 已提交
2203

2204
    if (atomic_load_8((int8_t *)&gCtgMgmt.queue.stopQueue)) {
D
dapan1121 已提交
2205
      ctgCleanupCacheQueue();
D
dapan1121 已提交
2206 2207 2208
      break;
    }

D
dapan1121 已提交
2209 2210 2211
    SCtgCacheOperation *operation = NULL;
    ctgDequeue(&operation);
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)operation->data)->pCtg;
D
dapan1121 已提交
2212

D
dapan1121 已提交
2213
    ctgDebug("process [%s] operation", gCtgCacheOperation[operation->opId].name);
dengyihao's avatar
dengyihao 已提交
2214

D
dapan1121 已提交
2215
    (*gCtgCacheOperation[operation->opId].func)(operation);
D
dapan1121 已提交
2216

D
dapan1121 已提交
2217
    if (operation->syncOp) {
D
dapan1121 已提交
2218
      tsem_post(&operation->rspSem);
D
dapan1121 已提交
2219 2220
    } else {
      taosMemoryFreeClear(operation);
D
dapan1121 已提交
2221 2222
    }

dengyihao's avatar
dengyihao 已提交
2223
    CTG_RT_STAT_INC(numOfOpDequeue, 1);
D
dapan1121 已提交
2224

D
dapan1121 已提交
2225
    ctgdShowCacheInfo();
D
dapan1121 已提交
2226 2227 2228
  }

  qInfo("catalog update thread stopped");
dengyihao's avatar
dengyihao 已提交
2229

D
dapan1121 已提交
2230 2231 2232 2233 2234 2235 2236 2237 2238 2239 2240 2241
  return NULL;
}

int32_t ctgStartUpdateThread() {
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);

  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
  }
dengyihao's avatar
dengyihao 已提交
2242

D
dapan1121 已提交
2243 2244 2245 2246
  taosThreadAttrDestroy(&thAttr);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2247
int32_t ctgGetTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **pTableMeta) {
D
dapan1121 已提交
2248
  if (IS_SYS_DBNAME(ctx->pName->dbname)) {
D
dapan1121 已提交
2249 2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260
    CTG_FLAG_SET_SYS_DB(ctx->flag);
  }

  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) &&
        ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
dengyihao's avatar
dengyihao 已提交
2261
    *pTableMeta = NULL;
D
dapan1121 已提交
2262 2263 2264 2265 2266 2267 2268 2269 2270
  }

  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2271
#if 0
2272
int32_t ctgGetTbMetaBFromCache(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx* ctx, SArray** pResList) {
D
dapan1121 已提交
2273 2274 2275 2276 2277 2278 2279 2280 2281 2282 2283 2284 2285 2286 2287 2288 2289 2290 2291 2292 2293 2294 2295 2296 2297 2298 2299 2300 2301 2302 2303 2304 2305 2306 2307 2308 2309
  int32_t tbNum = taosArrayGetSize(ctx->pNames);
  SName* fName = taosArrayGet(ctx->pNames, 0);
  int32_t fIdx = 0;
  
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pName = taosArrayGet(ctx->pNames, i);
    SCtgTbMetaCtx nctx = {0};
    nctx.flag = CTG_FLAG_UNKNOWN_STB;
    nctx.pName = pName;
    
    if (IS_SYS_DBNAME(pName->dbname)) {
      CTG_FLAG_SET_SYS_DB(nctx.flag);
    }

    STableMeta *pTableMeta = NULL;
    CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, &nctx, &pTableMeta));
    SMetaRes res = {0};
    
    if (pTableMeta) {
      if (CTG_FLAG_MATCH_STB(nctx.flag, pTableMeta->tableType) &&
          ((!CTG_FLAG_IS_FORCE_UPDATE(nctx.flag)) || (CTG_FLAG_IS_SYS_DB(nctx.flag)))) {
        res.pRes = pTableMeta;
      } else {
        taosMemoryFreeClear(pTableMeta);
      }
    }

    if (NULL == res.pRes) {
      if (NULL == ctx->pFetchs) {
        ctx->pFetchs = taosArrayInit(tbNum, sizeof(SCtgFetch));
      }
      
      if (CTG_FLAG_IS_UNKNOWN_STB(nctx.flag)) {
        CTG_FLAG_SET_STB(nctx.flag, nctx.tbInfo.tbType);
      }

      SCtgFetch fetch = {0};
2310
      fetch.tbIdx = i;
D
dapan1121 已提交
2311 2312 2313 2314 2315 2316
      fetch.fetchIdx = fIdx++;
      fetch.flag = nctx.flag;

      taosArrayPush(ctx->pFetchs, &fetch);
    }
    
2317
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2318 2319 2320
  }

  if (NULL == ctx->pFetchs) {
2321
    TSWAP(*pResList, ctx->pResList);
D
dapan1121 已提交
2322 2323 2324 2325
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2326 2327
#endif

dengyihao's avatar
dengyihao 已提交
2328 2329 2330 2331 2332 2333 2334 2335 2336
int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMetasCtx *ctx, int32_t dbIdx,
                               int32_t *fetchIdx, int32_t baseResIdx, SArray *pList) {
  int32_t     tbNum = taosArrayGetSize(pList);
  SName      *pName = taosArrayGet(pList, 0);
  char        dbFName[TSDB_DB_FNAME_LEN] = {0};
  int32_t     flag = CTG_FLAG_UNKNOWN_STB;
  uint64_t    lastSuid = 0;
  STableMeta *lastTableMeta = NULL;

D
dapan1121 已提交
2337 2338 2339 2340 2341 2342 2343 2344
  if (IS_SYS_DBNAME(pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(flag);
    strcpy(dbFName, pName->dbname);
  } else {
    tNameGetFullDbName(pName, dbFName);
  }

  SCtgDBCache *dbCache = NULL;
dengyihao's avatar
dengyihao 已提交
2345
  SCtgTbCache *pCache = NULL;
D
dapan1121 已提交
2346
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
dengyihao's avatar
dengyihao 已提交
2347

D
dapan1121 已提交
2348 2349 2350
  if (NULL == dbCache) {
    ctgDebug("db %s not in cache", dbFName);
    for (int32_t i = 0; i < tbNum; ++i) {
2351 2352
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2353 2354 2355 2356 2357 2358
    }

    return TSDB_CODE_SUCCESS;
  }

  for (int32_t i = 0; i < tbNum; ++i) {
dengyihao's avatar
dengyihao 已提交
2359
    SName *pName = taosArrayGet(pList, i);
D
dapan1121 已提交
2360 2361 2362 2363

    pCache = taosHashAcquire(dbCache->tbCache, pName->tname, strlen(pName->tname));
    if (NULL == pCache) {
      ctgDebug("tb %s not in cache, dbFName:%s", pName->tname, dbFName);
2364 2365
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
dengyihao's avatar
dengyihao 已提交
2366

D
dapan1121 已提交
2367 2368 2369 2370 2371 2372
      continue;
    }

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("tb %s meta not in cache, dbFName:%s", pName->tname, dbFName);
2373 2374
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
dengyihao's avatar
dengyihao 已提交
2375

D
dapan1121 已提交
2376 2377 2378
      continue;
    }

dengyihao's avatar
dengyihao 已提交
2379
    STableMeta *tbMeta = pCache->pMeta;
D
dapan1121 已提交
2380 2381 2382 2383 2384 2385 2386 2387

    SCtgTbMetaCtx nctx = {0};
    nctx.flag = flag;
    nctx.tbInfo.inCache = true;
    nctx.tbInfo.dbId = dbCache->dbId;
    nctx.tbInfo.suid = tbMeta->suid;
    nctx.tbInfo.tbType = tbMeta->tableType;

dengyihao's avatar
dengyihao 已提交
2388 2389
    SMetaRes    res = {0};
    STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
2390 2391 2392 2393
    if (tbMeta->tableType != TSDB_CHILD_TABLE) {
      int32_t metaSize = CTG_META_SIZE(tbMeta);
      pTableMeta = taosMemoryCalloc(1, metaSize);
      if (NULL == pTableMeta) {
2394
        ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2395 2396
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }
dengyihao's avatar
dengyihao 已提交
2397

D
dapan1121 已提交
2398
      memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2399

2400
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2401 2402
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2403
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2404

D
dapan1121 已提交
2405
      res.pRes = pTableMeta;
2406
      taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2407 2408 2409

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2410

D
dapan1121 已提交
2411 2412 2413 2414
    // PROCESS FOR CHILD TABLE

    if (lastSuid && tbMeta->suid == lastSuid && lastTableMeta) {
      cloneTableMeta(lastTableMeta, &pTableMeta);
2415
      memcpy(pTableMeta, tbMeta, sizeof(SCTableMeta));
D
dapan1121 已提交
2416

2417
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2418 2419
      taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2420
      ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
dengyihao's avatar
dengyihao 已提交
2421

D
dapan1121 已提交
2422
      res.pRes = pTableMeta;
2423
      taosArrayPush(ctx->pResList, &res);
dengyihao's avatar
dengyihao 已提交
2424

D
dapan1121 已提交
2425 2426
      continue;
    }
dengyihao's avatar
dengyihao 已提交
2427

D
dapan1121 已提交
2428 2429 2430
    int32_t metaSize = sizeof(SCTableMeta);
    pTableMeta = taosMemoryCalloc(1, metaSize);
    if (NULL == pTableMeta) {
dengyihao's avatar
dengyihao 已提交
2431
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2432 2433
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2434

D
dapan1121 已提交
2435
    memcpy(pTableMeta, tbMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2436

2437
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2438 2439 2440 2441 2442 2443
    taosHashRelease(dbCache->tbCache, pCache);

    ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", pName->tname,
             nctx.tbInfo.tbType, dbFName);

    char *stName = taosHashAcquire(dbCache->stbCache, &pTableMeta->suid, sizeof(pTableMeta->suid));
D
dapan1121 已提交
2444 2445
    if (NULL == stName) {
      ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2446 2447
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2448 2449 2450 2451 2452 2453 2454 2455 2456

      taosMemoryFreeClear(pTableMeta);
      continue;
    }

    pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
    if (NULL == pCache) {
      ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", pTableMeta->suid, stName, dbFName);
      taosHashRelease(dbCache->stbCache, stName);
dengyihao's avatar
dengyihao 已提交
2457

2458 2459
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2460

dengyihao's avatar
dengyihao 已提交
2461
      taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2462 2463 2464 2465 2466 2467 2468 2469
      continue;
    }

    taosHashRelease(dbCache->stbCache, stName);

    CTG_LOCK(CTG_READ, &pCache->metaLock);
    if (NULL == pCache->pMeta) {
      ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", pTableMeta->suid, dbFName);
2470
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2471 2472
      taosHashRelease(dbCache->tbCache, pCache);

2473 2474
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2475 2476 2477 2478 2479

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2480 2481 2482

    STableMeta *stbMeta = pCache->pMeta;
    if (stbMeta->suid != nctx.tbInfo.suid) {
2483
      CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2484 2485 2486 2487 2488
      taosHashRelease(dbCache->tbCache, pCache);

      ctgError("stb suid 0x%" PRIx64 " in stbCache mis-match, expected suid 0x%" PRIx64, stbMeta->suid,
               nctx.tbInfo.suid);

2489 2490
      ctgAddFetch(&ctx->pFetchs, dbIdx, i, fetchIdx, baseResIdx + i, flag);
      taosArraySetSize(ctx->pResList, taosArrayGetSize(ctx->pResList) + 1);
D
dapan1121 已提交
2491 2492 2493 2494 2495

      taosMemoryFreeClear(pTableMeta);

      continue;
    }
dengyihao's avatar
dengyihao 已提交
2496

D
dapan1121 已提交
2497 2498
    metaSize = CTG_META_SIZE(stbMeta);
    pTableMeta = taosMemoryRealloc(pTableMeta, metaSize);
dengyihao's avatar
dengyihao 已提交
2499
    if (NULL == pTableMeta) {
2500
      ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
D
dapan1121 已提交
2501 2502
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
2503

D
dapan1121 已提交
2504
    memcpy(&pTableMeta->sversion, &stbMeta->sversion, metaSize - sizeof(SCTableMeta));
dengyihao's avatar
dengyihao 已提交
2505

2506
    CTG_UNLOCK(CTG_READ, &pCache->metaLock);
dengyihao's avatar
dengyihao 已提交
2507 2508
    taosHashRelease(dbCache->tbCache, pCache);

D
dapan1121 已提交
2509
    res.pRes = pTableMeta;
2510
    taosArrayPush(ctx->pResList, &res);
D
dapan1121 已提交
2511 2512 2513 2514 2515

    lastSuid = pTableMeta->suid;
    lastTableMeta = pTableMeta;
  }

2516
  ctgReleaseDBCache(pCtg, dbCache);
dengyihao's avatar
dengyihao 已提交
2517

D
dapan1121 已提交
2518 2519 2520
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2521
int32_t ctgRemoveTbMetaFromCache(SCatalog *pCtg, SName *pTableName, bool syncReq) {
D
dapan1121 已提交
2522
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
2523
  STableMeta   *tblMeta = NULL;
D
dapan1121 已提交
2524 2525 2526 2527 2528 2529 2530 2531 2532 2533 2534 2535 2536 2537 2538 2539 2540 2541 2542 2543 2544 2545 2546 2547 2548 2549 2550 2551
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;

  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));

  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgDropStbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgDropTbMetaEnqueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
  }

_return:

  taosMemoryFreeClear(tblMeta);

  CTG_RET(code);
}

int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVgroupInfo **pVgroup) {
D
dapan1121 已提交
2552
  if (IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
2553 2554 2555 2556
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

dengyihao's avatar
dengyihao 已提交
2557
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
2558 2559 2560 2561 2562 2563 2564 2565 2566 2567 2568 2569 2570 2571 2572 2573 2574 2575 2576 2577 2578 2579 2580 2581 2582 2583
  int32_t      code = 0;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));

  if (NULL == dbCache) {
    *pVgroup = NULL;
    return TSDB_CODE_SUCCESS;
  }

  *pVgroup = taosMemoryCalloc(1, sizeof(SVgroupInfo));
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pTableName, *pVgroup));

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

  if (code) {
    taosMemoryFreeClear(*pVgroup);
  }

  CTG_RET(code);
}