catalog.c 55.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

D
dapan1121 已提交
23
SCtgDebug gCTGDebug = {0};
24

D
dapan1121 已提交
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
void ctgShowDBCache(SHashObj *dbHash) {
  if (NULL == dbHash) {
    return;
  }

  int32_t i = 0;
  SCtgDBCache *dbCache = NULL;
  void *pIter = taosHashIterate(dbHash, NULL);
  while (pIter) {
    char *dbFName = NULL;
    size_t len = 0;
    
    dbCache = (SCtgDBCache *)pIter;

    taosHashGetKey(dbCache, &dbFName, &len);
    
    CTG_CACHE_DEBUG("** %dth db [%.*s] **", i, len, dbFName);
    
    pIter = taosHashIterate(dbHash, pIter);
  }
}

void ctgShowClusterCache(struct SCatalog* pCatalog) {
  if (NULL == pCatalog) {
    return;
  }

  CTG_CACHE_DEBUG("## cluster %"PRIx64" cache Info ##", pCatalog->clusterId);
  CTG_CACHE_DEBUG("db cache number:%d", pCatalog->dbCache ? taosHashGetSize(pCatalog->dbCache) : 0);    
  ctgShowDBCache(pCatalog->dbCache);

}

int32_t ctgInitDBCache(struct SCatalog* pCatalog) {
  if (NULL == pCatalog->dbCache) {
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
      ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) {
      taosHashCleanup(cache);
    }
  }

  return TSDB_CODE_SUCCESS;
}


int32_t ctgInitTbMetaCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) {
  if (NULL == dbCache->tbCache.metaCache) {
    if (dbCache->deleted) {
      ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId);
      CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
    }

    SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == metaCache) {
      ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.metaCache, NULL, metaCache)) {
      taosHashCleanup(metaCache);
    }
  }

  return TSDB_CODE_SUCCESS;
}

int32_t ctgInitStbCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) {
  if (NULL == dbCache->tbCache.stbCache) {
    if (dbCache->deleted) {
      ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId);
      CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
    }
  
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
      ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) {
      taosHashCleanup(cache);
    }
  }

  return TSDB_CODE_SUCCESS;
}


118

D
dapan1121 已提交
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
  if (NULL == mgmt->slots) {
    return;
  }

  for (int32_t i = 0; i < mgmt->slotNum; ++i) {
    SCtgRentSlot *slot = &mgmt->slots[i];
    if (slot->meta) {
      taosArrayDestroy(slot->meta);
      slot->meta = NULL;
    }
  }

  tfree(mgmt->slots);
}


D
dapan1121 已提交
136 137 138 139 140
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    taosHashCleanup(cache->stbCache);
    cache->stbCache = NULL;
D
dapan1121 已提交
141
  }
D
dapan1121 已提交
142
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
D
dapan1121 已提交
143

D
dapan1121 已提交
144 145 146 147
  CTG_LOCK(CTG_WRITE, &cache->metaLock);
  if (cache->metaCache) {
    taosHashCleanup(cache->metaCache);
    cache->metaCache = NULL;
D
dapan1121 已提交
148
  }
D
dapan1121 已提交
149
  CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
D
dapan1121 已提交
150 151 152 153 154 155 156 157 158
}

void ctgFreeDbCache(SCtgDBCache *dbCache) {
  if (NULL == dbCache) {
    return;
  }

  atomic_store_8(&dbCache->deleted, 1);

D
dapan1121 已提交
159
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
160 161 162 163 164 165 166 167 168
  if (dbCache->vgInfo) {

    if (dbCache->vgInfo->vgHash) {
      taosHashCleanup(dbCache->vgInfo->vgHash);
      dbCache->vgInfo->vgHash = NULL;
    }

    tfree(dbCache->vgInfo);
  }
D
dapan1121 已提交
169
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193

  ctgFreeTableMetaCache(&dbCache->tbCache);
}


void ctgFreeHandle(struct SCatalog* pCatalog) {
  ctgFreeMetaRent(&pCatalog->dbRent);
  ctgFreeMetaRent(&pCatalog->stbRent);
  if (pCatalog->dbCache) {
    void *pIter = taosHashIterate(pCatalog->dbCache, NULL);
    while (pIter) {
      SCtgDBCache *dbCache = pIter;

      ctgFreeDbCache(dbCache);
      
      pIter = taosHashIterate(pCatalog->dbCache, pIter);
    }  

    taosHashCleanup(pCatalog->dbCache);
  }
  
  free(pCatalog);
}

D
dapan1121 已提交
194
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbFName, SCtgDBCache **dbCache, bool *inCache) {
D
dapan1121 已提交
195
  if (NULL == pCatalog->dbCache) {
D
dapan1121 已提交
196
    *inCache = false;
D
dapan1121 已提交
197
    ctgWarn("empty db cache, dbFName:%s", dbFName);
D
dapan1121 已提交
198 199 200
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
201
  SCtgDBCache *cache = NULL;
D
dapan1121 已提交
202

D
dapan1121 已提交
203
  while (true) {
D
dapan1121 已提交
204
    cache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
D
dapan1121 已提交
205

D
dapan1121 已提交
206
    if (NULL == cache) {
D
dapan1121 已提交
207
      *inCache = false;
D
dapan1121 已提交
208
      ctgWarn("not in db vgroup cache, dbFName:%s", dbFName);
D
dapan1121 已提交
209 210 211
      return TSDB_CODE_SUCCESS;
    }

D
dapan1121 已提交
212 213 214 215
    CTG_LOCK(CTG_READ, &cache->vgLock);
    if (NULL == cache->vgInfo) {
      CTG_UNLOCK(CTG_READ, &cache->vgLock);
      taosHashRelease(pCatalog->dbCache, cache);
D
dapan1121 已提交
216
      ctgWarn("db cache vgInfo is NULL, dbFName:%s", dbFName);
D
dapan1121 已提交
217 218 219 220 221
      
      continue;
    }

    break;
D
dapan1121 已提交
222
  }
D
dapan1121 已提交
223

D
dapan1121 已提交
224
  *dbCache = cache;
D
dapan1121 已提交
225
  *inCache = true;
D
dapan1121 已提交
226

D
dapan1121 已提交
227
  ctgDebug("Got db vgroup from cache, dbFName:%s", dbFName);
D
dapan1121 已提交
228 229 230 231 232 233 234 235 236 237
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
238 239 240 241 242 243 244
  ctgDebug("try to get db vgroup from mnode, db:%s", input->db);

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
245
  
D
dapan1121 已提交
246
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
247
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
248
      .pCont   = msg,
D
dapan1121 已提交
249 250 251 252 253 254
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
255
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
H
Haojun Liao 已提交
256
    ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
D
dapan1121 已提交
257
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
258
  }
D
dapan1121 已提交
259

D
dapan1121 已提交
260 261 262 263 264
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
265

D
dapan1121 已提交
266 267
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
268

D
dapan1121 已提交
269 270
int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, char* tbName, int32_t *exist) {
  if (NULL == pCatalog->dbCache) {
D
dapan1121 已提交
271
    *exist = 0;
D
dapan1121 已提交
272 273 274 275 276 277 278 279
    ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
    return TSDB_CODE_SUCCESS;
  }

  SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
  if (NULL == dbCache) {
    *exist = 0;
    ctgWarn("db not exist in cache, dbFName:%s", dbFName);
D
dapan1121 已提交
280 281 282 283
    return TSDB_CODE_SUCCESS;
  }


D
dapan1121 已提交
284
  size_t sz = 0;
D
dapan1121 已提交
285 286 287 288
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);  
  STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
D
dapan1121 已提交
289
  if (NULL == tbMeta) {
D
dapan1121 已提交
290 291
    taosHashRelease(pCatalog->dbCache, dbCache);
    
D
dapan1121 已提交
292
    *exist = 0;
D
dapan1121 已提交
293
    ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
294 295 296 297
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
298 299

  taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
300
  
D
dapan1121 已提交
301
  ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
302 303 304 305
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
306

H
Haojun Liao 已提交
307
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
308
  if (NULL == pCatalog->dbCache) {
D
dapan1121 已提交
309
    *exist = 0;
D
dapan1121 已提交
310
    ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
D
dapan1121 已提交
311 312 313
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
314 315
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
316

D
dapan1121 已提交
317 318
  *pTableMeta = NULL;

D
dapan1121 已提交
319 320 321 322 323 324
  SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, db, strlen(db));
  if (NULL == dbCache) {
    *exist = 0;
    ctgWarn("no db cache, dbFName:%s, tbName:%s", db, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
325

D
dapan1121 已提交
326
  if (NULL == dbCache->tbCache.metaCache) {
D
dapan1121 已提交
327 328 329 330 331 332
    *exist = 0;
    taosHashRelease(pCatalog->dbCache, dbCache);
    ctgWarn("empty tbmeta cache, dbFName:%s, tbName:%s", db, pTableName->tname);
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
333 334 335 336 337
  size_t sz = 0;  
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan1121 已提交
338
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
339
    *exist = 0;
D
dapan1121 已提交
340 341
    taosHashRelease(pCatalog->dbCache, dbCache);
    ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", db, pTableName->tname);
D
dapan1121 已提交
342 343 344
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
345
  *exist = 1;
D
dapan1121 已提交
346 347
  
  tbMeta = *pTableMeta;
D
dapan1121 已提交
348

D
dapan1121 已提交
349
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan 已提交
350
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
351
    ctgDebug("Got tbmeta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, db, pTableName->tname);
D
dapan1121 已提交
352 353 354
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
355
  CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
356
  
D
dapan1121 已提交
357
  STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
D
dapan1121 已提交
358
  if (NULL == stbMeta || NULL == *stbMeta) {
D
dapan1121 已提交
359
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan 已提交
360
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
361
    ctgError("stable not in stbCache, suid:%"PRIx64, tbMeta->suid);
D
dapan1121 已提交
362 363 364 365
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
366

D
dapan1121 已提交
367
  if ((*stbMeta)->suid != tbMeta->suid) {    
D
dapan1121 已提交
368
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan 已提交
369
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
370
    tfree(*pTableMeta);
D
dapan1121 已提交
371
    ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
372 373
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
374

D
dapan1121 已提交
375 376 377
  int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
D
dapan1121 已提交
378
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan 已提交
379
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
380
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
381
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
382 383
  }

D
dapan1121 已提交
384 385
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

D
dapan1121 已提交
386
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
387

D
dapan 已提交
388 389
  taosHashRelease(pCatalog->dbCache, dbCache);

D
dapan1121 已提交
390
  ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", db, pTableName->tname);
D
dapan1121 已提交
391 392 393 394
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
395
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
D
dapan1121 已提交
396 397
  if (NULL == pCatalog->dbCache) {
    ctgWarn("empty db cache, tbName:%s", pTableName->tname);  
D
dapan1121 已提交
398 399 400
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
401 402
  char dbName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbName);
D
dapan1121 已提交
403

D
dapan1121 已提交
404 405 406 407 408
  SCtgDBCache *dbCache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
  if (NULL == dbCache) {
    ctgInfo("db not in cache, dbFName:%s", dbName);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
409

D
dapan1121 已提交
410 411 412 413
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan1121 已提交
414
  if (NULL == pTableMeta) {
D
dapan1121 已提交
415 416 417
    ctgWarn("tbmeta not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname);  
    taosHashRelease(pCatalog->dbCache, dbCache);
    
D
dapan1121 已提交
418 419 420
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
421 422
  *tbType = atomic_load_8(&pTableMeta->tableType);

D
dapan1121 已提交
423
  taosHashRelease(dbCache->tbCache.metaCache, dbCache);
D
dapan1121 已提交
424
  taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
425

D
dapan1121 已提交
426
  ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType);  
D
dapan1121 已提交
427 428 429 430
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
431 432
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
  SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
D
dapan1121 已提交
433 434 435 436
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
437
  ctgDebug("try to get table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
438 439 440 441 442 443

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
444 445 446 447 448 449

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
450

D
dapan1121 已提交
451 452
  SRpcMsg rpcRsp = {0};

D
dapan1121 已提交
453
  rpcSendRecv(pTransporter, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
454 455
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
456
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
457
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
458
      ctgDebug("stablemeta not exist in mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
459 460 461
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
462
    ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
D
dapan1121 已提交
463 464 465
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
466 467
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
468
    ctgError("Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tbName);
D
dapan1121 已提交
469 470 471
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
472
  ctgDebug("Got table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
473 474 475 476

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
477
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
D
dapan1121 已提交
478 479
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
480

D
dapan1121 已提交
481
  return ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output);
D
dapan1121 已提交
482
}
D
dapan1121 已提交
483

D
dapan1121 已提交
484 485
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
486
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
487 488
  }

D
dapan1121 已提交
489 490
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
491

D
dapan1121 已提交
492
  ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
493

D
dapan1121 已提交
494
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
D
dapan1121 已提交
495 496 497
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
498 499
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
D
dapan1121 已提交
500
    ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
501 502
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
503 504

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
505
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
506 507 508 509 510
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
H
Haojun Liao 已提交
511
  rpcSendRecv(pTransporter, &vgroupInfo->epset, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
512
  
D
dapan1121 已提交
513
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
514
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
515
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
516
      ctgDebug("tablemeta not exist in vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
517 518 519
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
520
    ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
521
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
522 523
  }

D
dapan1121 已提交
524 525
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
526
    ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
527 528 529
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
530
  ctgDebug("Got table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
531 532 533 534
  return TSDB_CODE_SUCCESS;
}


535 536
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
537 538 539 540 541 542 543 544
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
545
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
546
  SHashObj *vgroupHash = NULL;
547
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
548 549
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
550
  int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
551

D
dapan1121 已提交
552
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
553
  if (NULL == vgList) {
D
dapan1121 已提交
554
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
555 556 557
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

D
dapan1121 已提交
558
  void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
559 560
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
561

D
dapan1121 已提交
562
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
563
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
564
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
565 566
    }
    
D
dapan1121 已提交
567
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
568
    vgInfo = NULL;
D
dapan1121 已提交
569 570
  }

D
dapan1121 已提交
571 572 573
  *vgroupList = vgList;
  vgList = NULL;

D
dapan1121 已提交
574 575
  ctgDebug("Got vg list from DB, vgNum:%d", vgNum);

D
dapan1121 已提交
576
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
577 578 579 580 581 582 583 584

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
585 586
}

D
dapan1121 已提交
587
int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
588 589
  int32_t code = 0;
  
D
dapan1121 已提交
590
  int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
H
Haojun Liao 已提交
591 592 593
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

594
  if (vgNum <= 0) {
D
dapan1121 已提交
595
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
596
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
597 598
  }

599 600
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
601

D
dapan1121 已提交
602
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
603 604

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
605
  tNameExtractFullName(pTableName, tbFullName);
606 607 608

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

D
dapan1121 已提交
609
  void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
610 611 612
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
D
dapan1121 已提交
613
      taosHashCancelIterate(dbInfo->vgHash, pIter);
614
      break;
D
dapan1121 已提交
615
    }
616
    
D
dapan1121 已提交
617
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
618
    vgInfo = NULL;
D
dapan1121 已提交
619 620
  }

621
  if (NULL == vgInfo) {
D
dapan1121 已提交
622 623
    ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
624 625 626 627
  }

  *pVgroup = *vgInfo;

628
  CTG_RET(code);
D
dapan1121 已提交
629 630
}

D
dapan1121 已提交
631
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
D
dapan 已提交
632
  if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
633
    return -1;
D
dapan 已提交
634
  } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
635 636 637 638 639 640 641
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
D
dapan1121 已提交
642
  if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
643
    return -1;
D
dapan1121 已提交
644
  } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
645 646 647
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
648
  }
D
dapan1121 已提交
649 650 651
}


D
dapan1121 已提交
652
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
D
dapan1121 已提交
653 654 655 656
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

D
dapan1121 已提交
657
  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
D
dapan1121 已提交
658
  
D
dapan1121 已提交
659 660 661
  mgmt->slots = calloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan 已提交
662
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
663
  }
D
dapan1121 已提交
664

D
dapan1121 已提交
665 666 667 668
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
669

D
dapan1121 已提交
670

D
dapan1121 已提交
671
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
D
dapan1121 已提交
672
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
673

D
dapan1121 已提交
674
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
675 676 677 678 679 680 681 682
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
683
    }
D
dapan1121 已提交
684
  }
D
dapan1121 已提交
685

D
dapan1121 已提交
686 687 688
  if (NULL == taosArrayPush(slot->meta, meta)) {
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
689 690
  }

D
dapan1121 已提交
691
  slot->needSort = true;
D
dapan1121 已提交
692

D
dapan1121 已提交
693
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
694

D
dapan1121 已提交
695 696 697 698 699 700
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

D
dapan1121 已提交
701
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
D
dapan1121 已提交
702
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
703

D
dapan1121 已提交
704
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
705 706 707 708
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
709 710
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
711 712 713 714 715
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
D
dapan1121 已提交
716
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
D
dapan1121 已提交
717 718 719
  }

  void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
D
dapan1121 已提交
720
  if (NULL == orig) {
D
dapan1121 已提交
721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

D
dapan1121 已提交
741
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
D
dapan1121 已提交
742 743
  int16_t widx = abs(id % mgmt->slotNum);

D
dapan1121 已提交
744
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
  if (idx < 0) {
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

  qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}


D
dapan1121 已提交
777
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
778 779 780 781
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
782
  }
D
dapan1121 已提交
783

D
dapan1121 已提交
784
  SCtgRentSlot *slot = &mgmt->slots[ridx];
D
dapan1121 已提交
785
  int32_t code = 0;
D
dapan1121 已提交
786
  
D
dapan1121 已提交
787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = malloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

D
dapan1121 已提交
823
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
843 844 845
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
846 847 848 849 850 851 852 853 854 855 856 857 858
int32_t ctgAddDBCache(struct SCatalog *pCatalog, char *dbFName, SCtgDBCache *dbCache) {
  int32_t code = 0;
  if (taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache))) {
    ctgError("taosHashPut db to cache failed, db:%s", dbFName);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
  SDbVgVersion vgVersion = {.dbId = dbCache->dbId, .vgVersion = dbCache->vgInfo ? dbCache->vgInfo->vgVersion : -1};
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));

  ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId);
  
  CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion)));
D
dapan1121 已提交
859

D
dapan1121 已提交
860
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
861

D
dapan1121 已提交
862
_return:
D
dapan1121 已提交
863

D
dapan1121 已提交
864
  ctgFreeDbCache(dbCache);
D
dapan1121 已提交
865

D
dapan1121 已提交
866 867
  CTG_RET(code);
}
D
dapan1121 已提交
868

D
dapan1121 已提交
869 870 871 872 873 874 875

int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) {
  CTG_LOCK(CTG_READ, &tbCache->metaLock);  
  if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
876
  }
D
dapan1121 已提交
877 878 879
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
  
  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
880

D
dapan1121 已提交
881 882
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
883

D
dapan1121 已提交
884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901
int32_t ctgUpdateStbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) {
  bool newAdded = false;
  int32_t code = 0;
  SSTableMetaVersion metaRent = {.suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
  strcpy(metaRent.dbFName, dbFName);
  strcpy(metaRent.stbName, tbName);
  
  CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
  
  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  STableMeta *orig = taosHashAcquire(tbCache->metaCache,  tbName, strlen(tbName));
  if (orig) {
    if (orig->suid != meta->suid) {
      if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
        ctgError("stb not exist in stbCache, db:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      }
      
      ctgMetaRentRemove(&pCatalog->stbRent, orig->suid, ctgSTableVersionCompare);
D
dapan1121 已提交
902
    }
D
dapan1121 已提交
903 904
  
    taosHashRelease(tbCache->metaCache, orig);
D
dapan1121 已提交
905
  }
D
dapan1121 已提交
906
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
D
dapan1121 已提交
907

D
dapan1121 已提交
908
  CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, tbCache, dbFName, tbName, meta, metaSize));
D
dapan1121 已提交
909

D
dapan1121 已提交
910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927
  CTG_LOCK(CTG_READ, &tbCache->metaLock);  
  STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (taosHashPutExt(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
  
  CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
  
  ctgDebug("update stable to cache, suid:%"PRIx64, meta->suid);
  
  if (newAdded) {
    CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
  } else {
    CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
D
dapan1121 已提交
928
  }
D
dapan1121 已提交
929

D
dapan1121 已提交
930
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
931

D
dapan1121 已提交
932 933 934 935 936 937 938 939 940 941 942 943 944 945 946
_return:

  CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);

  CTG_RET(code);
}


int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
  int32_t code = 0;
  SCtgDBCache *dbCache = NULL;

  if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
947 948
  }

D
dapan1121 已提交
949 950 951 952 953 954
  CTG_ERR_RET(ctgInitDBCache(pCatalog));

  CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, output->dbFName, output->dbId, &dbCache));

  CTG_ERR_JRET(ctgInitTbMetaCache(pCatalog, dbCache));
  CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache));
D
dapan1121 已提交
955

D
dapan1121 已提交
956 957
  if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
D
dapan1121 已提交
958
  }
D
dapan1121 已提交
959

D
dapan1121 已提交
960
  if (CTG_IS_META_CTABLE(output->metaType)) {
D
dapan1121 已提交
961
    goto _return;
D
dapan1121 已提交
962
  }
D
dapan1121 已提交
963 964 965
  
  if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
D
dapan1121 已提交
966
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
967
  }    
D
dapan1121 已提交
968

D
dapan1121 已提交
969
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
D
dapan1121 已提交
970 971

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
972
    CTG_ERR_JRET(ctgUpdateStbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize));
D
dapan1121 已提交
973
  } else {
D
dapan1121 已提交
974
    CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize));
D
dapan1121 已提交
975
  }
D
dapan1121 已提交
976

D
dapan1121 已提交
977 978 979 980 981
_return:

  if (dbCache) {
    taosHashRelease(pCatalog->dbCache, dbCache);
  }
D
dapan1121 已提交
982

D
dapan1121 已提交
983
  CTG_RET(code);
D
dapan1121 已提交
984 985
}

D
dapan1121 已提交
986
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache) {
D
dapan1121 已提交
987
  bool inCache = false;
988
  if (!forceUpdate) {
D
dapan1121 已提交
989
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache));
D
dapan1121 已提交
990
    if (inCache) {
D
dapan1121 已提交
991 992
      return TSDB_CODE_SUCCESS;
    }
993

D
dapan1121 已提交
994
    ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbFName, forceUpdate);
D
dapan1121 已提交
995 996 997 998 999
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

D
dapan1121 已提交
1000
  tstrncpy(input.db, dbFName, tListLen(input.db));
D
dapan1121 已提交
1001
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
1002

D
dapan1121 已提交
1003 1004
  while (true) {
    CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
1005 1006
    CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbVgroup));
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache));
D
dapan1121 已提交
1007 1008

    if (!inCache) {
D
dapan1121 已提交
1009
      ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbFName);
D
dapan1121 已提交
1010 1011 1012 1013 1014 1015 1016 1017 1018
      continue;
    }

    break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1019 1020 1021 1022 1023 1024 1025
void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) {
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    void *pIter = taosHashIterate(cache->stbCache, NULL);
    while (pIter) {
      uint64_t suid = 0;
      taosHashGetKey(pIter, &suid, NULL);
D
dapan1121 已提交
1026

D
dapan1121 已提交
1027 1028 1029 1030 1031 1032 1033
      CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
      ctgDebug("stb removed from rent, suid:%"PRIx64, suid);
          
      pIter = taosHashIterate(cache->stbCache, pIter);
    }
  }
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
D
dapan1121 已提交
1034

D
dapan1121 已提交
1035 1036 1037 1038 1039 1040 1041
  ctgFreeTableMetaCache(cache);
}

int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) {
  if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) {
    ctgError("taosHashRemove from dbCache failed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1042
  }
D
dapan1121 已提交
1043 1044

  atomic_store_8(&dbCache->deleted, 1);
D
dapan1121 已提交
1045
  
D
dapan1121 已提交
1046
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
1047 1048 1049 1050 1051 1052
  if (dbCache->vgInfo) {
    ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
    
    if (dbCache->vgInfo->vgHash) {
      taosHashCleanup(dbCache->vgInfo->vgHash);
    }
D
dapan1121 已提交
1053
    
D
dapan1121 已提交
1054
    tfree(dbCache->vgInfo);
D
dapan1121 已提交
1055
  }
D
dapan1121 已提交
1056
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
1057

D
dapan1121 已提交
1058 1059 1060
  ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache);

  ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
D
dapan1121 已提交
1061

D
dapan1121 已提交
1062
  CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
D
dapan1121 已提交
1063
  
D
dapan1121 已提交
1064 1065 1066 1067
  ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1068

D
dapan1121 已提交
1069

D
dapan1121 已提交
1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085
int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
  int32_t code = 0;
  SCtgDBCache *dbCache = NULL;
  
  while (true) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
    if (dbCache) {
      if (dbCache->dbId == dbId) {
        *pCache = dbCache;
        return TSDB_CODE_SUCCESS;
      }
      
      CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));
      taosHashRelease(pCatalog->dbCache, dbCache);
      dbCache = NULL;
    }
D
dapan1121 已提交
1086

D
dapan1121 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
    SCtgDBCache newDBCache = {0};
    newDBCache.dbId = dbId;
    
    CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache));
  }

_return:

  if (dbCache) {
    taosHashRelease(pCatalog->dbCache, dbCache);
  }
D
dapan1121 已提交
1098
  
D
dapan1121 已提交
1099
  CTG_RET(code);
D
dapan1121 已提交
1100 1101
}

D
dapan1121 已提交
1102

D
dapan 已提交
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119
int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) {
  *removed = false;

  SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
  if (NULL == dbCache) {
    ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName);
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  if (taosHashRemove(dbCache->tbCache.stbCache, &suid, sizeof(suid))) {
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    taosHashRelease(pCatalog->dbCache, dbCache);
    ctgInfo("stb not exist in stbCache, may be removed, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1120 1121 1122
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  if (taosHashRemove(dbCache->tbCache.metaCache, stbName, strlen(stbName))) {  
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan 已提交
1123 1124 1125 1126
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    taosHashRelease(pCatalog->dbCache, dbCache);
    ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1127 1128 1129
  }  
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
D
dapan 已提交
1130 1131 1132 1133 1134 1135 1136 1137 1138 1139
  CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  
  taosHashRelease(pCatalog->dbCache, dbCache);

  *removed = true;
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1140

D
dapan1121 已提交
1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));

  STableMetaOutput voutput = {0};
  STableMetaOutput moutput = {0};
  STableMetaOutput *output = &voutput;

  if (CTG_IS_STABLE(isSTable)) {
D
dapan1121 已提交
1156
    ctgDebug("will renew tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1157 1158

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
1159 1160
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));

D
dapan1121 已提交
1161
    if (CTG_IS_META_NULL(moutput.metaType)) {
D
dapan1121 已提交
1162 1163 1164 1165 1166
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
    } else {
      output = &moutput;
    }
  } else {
D
dapan1121 已提交
1167
    ctgDebug("will renew tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
D
dapan1121 已提交
1168 1169

    // if get from vnode failed or no table meta, will not try mnode
D
dapan1121 已提交
1170 1171
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));

D
dapan1121 已提交
1172
    if (CTG_IS_META_TABLE(voutput.metaType) && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
D
dapan1121 已提交
1173
      ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), voutput.metaType);
D
dapan1121 已提交
1174
      
D
dapan1121 已提交
1175
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput));
D
dapan1121 已提交
1176

D
dapan1121 已提交
1177 1178
      voutput.metaType = moutput.metaType;
      
D
dapan1121 已提交
1179 1180 1181
      tfree(voutput.tbMeta);
      voutput.tbMeta = moutput.tbMeta;
      moutput.tbMeta = NULL;
D
dapan1121 已提交
1182 1183
    } else if (CTG_IS_META_BOTH(voutput.metaType)) {
      int32_t exist = 0;
D
dapan1121 已提交
1184
      CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCatalog, voutput.dbFName, voutput.tbName, &exist));
D
dapan1121 已提交
1185
      if (0 == exist) {
D
dapan1121 已提交
1186
        CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.dbFName, voutput.tbName, &moutput));
D
dapan1121 已提交
1187

D
dapan1121 已提交
1188 1189
        if (CTG_IS_META_NULL(moutput.metaType)) {
          SET_META_TYPE_NULL(voutput.metaType);
D
dapan1121 已提交
1190 1191 1192 1193 1194 1195
        }
        
        tfree(voutput.tbMeta);
        voutput.tbMeta = moutput.tbMeta;
        moutput.tbMeta = NULL;
      } else {
D
dapan1121 已提交
1196 1197
        tfree(voutput.tbMeta);
        
D
dapan1121 已提交
1198 1199
        SET_META_TYPE_CTABLE(voutput.metaType); 
      }
D
dapan1121 已提交
1200 1201 1202
    }
  }

D
dapan1121 已提交
1203
  if (CTG_IS_META_NULL(output->metaType)) {
1204
    ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1205 1206 1207
    CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
  }

D
dapan1121 已提交
1208 1209 1210 1211 1212 1213 1214 1215 1216 1217
  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));

_return:

  tfree(voutput.tbMeta);
  tfree(moutput.tbMeta);
  
  CTG_RET(code);
}

D
dapan1121 已提交
1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243
int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
  int32_t exist = 0;

  if (!forceUpdate) {
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));

    if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
      return TSDB_CODE_SUCCESS;
    }
  } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
    int32_t tbType = 0;
    
    CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType));

    CTG_SET_STABLE(isSTable, tbType);
  }

  CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));

  if (0 == exist) {
1244
    ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1245 1246 1247 1248 1249 1250 1251
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1252
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
1253
  if (ctgMgmt.pCluster) {
D
dapan 已提交
1254
    qError("catalog already initialized");
D
dapan1121 已提交
1255
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1256 1257 1258 1259
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
1260

D
dapan1121 已提交
1261 1262 1263 1264 1265 1266 1267
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
1268 1269 1270 1271 1272

    if (ctgMgmt.cfg.dbRentSec == 0) {
      ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    }

D
dapan1121 已提交
1273 1274
    if (ctgMgmt.cfg.stbRentSec == 0) {
      ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1275
    }
D
dapan1121 已提交
1276 1277 1278
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan1121 已提交
1279
    ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1280
    ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
1281 1282
  }

D
dapan1121 已提交
1283
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1284
  if (NULL == ctgMgmt.pCluster) {
D
dapan1121 已提交
1285 1286
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1287 1288
  }

D
dapan1121 已提交
1289
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stbRentSec);
D
dapan1121 已提交
1290

D
dapan 已提交
1291
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1292 1293
}

1294 1295
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
  if (NULL == catalogHandle) {
D
dapan1121 已提交
1296
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1297 1298 1299
  }

  if (NULL == ctgMgmt.pCluster) {
D
dapan 已提交
1300
    qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
1301
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
1302 1303
  }

D
dapan1121 已提交
1304 1305
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
1306

D
dapan1121 已提交
1307 1308
  while (true) {
    SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
1309

D
dapan1121 已提交
1310 1311 1312 1313 1314
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
1315

D
dapan1121 已提交
1316 1317 1318 1319 1320 1321
    clusterCtg = calloc(1, sizeof(SCatalog));
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan1121 已提交
1322 1323
    clusterCtg->clusterId = clusterId;

D
dapan1121 已提交
1324
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
D
dapan1121 已提交
1325
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, ctgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
D
dapan1121 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340

    code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
1341
  }
D
dapan1121 已提交
1342 1343

  *catalogHandle = clusterCtg;
D
dapan 已提交
1344
  
D
dapan1121 已提交
1345
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

void catalogFreeHandle(struct SCatalog* pCatalog) {
  if (NULL == pCatalog) {
    return;
  }
D
dapan1121 已提交
1358 1359 1360 1361 1362 1363 1364

  if (taosHashRemove(ctgMgmt.pCluster, &pCatalog->clusterId, sizeof(pCatalog->clusterId))) {
    ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCatalog->clusterId);
    return;
  }

  uint64_t clusterId = pCatalog->clusterId;
D
dapan1121 已提交
1365 1366
  
  ctgFreeHandle(pCatalog);
D
dapan1121 已提交
1367 1368

  ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
D
dapan 已提交
1369 1370
}

D
dapan1121 已提交
1371 1372
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
1373
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1374 1375
  }

D
dapan1121 已提交
1376
  if (NULL == pCatalog->dbCache) {
D
dapan1121 已提交
1377
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1378
    ctgInfo("empty db cache, dbName:%s", dbName);
D
dapan1121 已提交
1379 1380 1381
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1382 1383
  SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
  if (NULL == db) {
D
dapan1121 已提交
1384
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1385
    ctgInfo("db not in cache, dbName:%s", dbName);
D
dapan1121 已提交
1386 1387 1388
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402
  CTG_LOCK(CTG_READ, &db->vgLock);
  
  if (NULL == db->vgInfo) {
    CTG_UNLOCK(CTG_READ, &db->vgLock);

    *version = CTG_DEFAULT_INVALID_VERSION;
    ctgInfo("db not in cache, dbName:%s", dbName);
    return TSDB_CODE_SUCCESS;
  }

  *version = db->vgInfo->vgVersion;
  CTG_UNLOCK(CTG_READ, &db->vgLock);
  
  taosHashRelease(pCatalog->dbCache, db);
D
dapan1121 已提交
1403

D
dapan1121 已提交
1404 1405
  ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);

D
dapan1121 已提交
1406 1407 1408
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1409
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
D
dapan1121 已提交
1410 1411 1412 1413
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1414
  SCtgDBCache* dbCache   = NULL;
D
dapan1121 已提交
1415
  SVgroupInfo *vgInfo = NULL;
1416 1417

  int32_t code = 0;
D
dapan1121 已提交
1418
  SArray *vgList = NULL;
D
dapan1121 已提交
1419
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache));
D
dapan1121 已提交
1420

D
dapan1121 已提交
1421 1422
  int32_t vgNum = (int32_t)taosHashGetSize(dbCache->vgInfo->vgHash);
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
1423
  if (NULL == vgList) {
D
dapan1121 已提交
1424
    ctgError("taosArrayInit %d failed", vgNum);
D
dapan1121 已提交
1425 1426 1427
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
  }

D
dapan1121 已提交
1428
  void *pIter = taosHashIterate(dbCache->vgInfo->vgHash, NULL);
D
dapan1121 已提交
1429 1430 1431 1432
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
1433
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
1434
      taosHashCancelIterate(dbCache->vgInfo->vgHash, pIter);
D
dapan1121 已提交
1435 1436 1437
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
D
dapan1121 已提交
1438
    pIter = taosHashIterate(dbCache->vgInfo->vgHash, pIter);
D
dapan1121 已提交
1439 1440 1441 1442 1443 1444 1445
    vgInfo = NULL;
  }

  *vgroupList = vgList;
  vgList = NULL;

_return:
D
dapan1121 已提交
1446 1447 1448 1449

  if (dbCache) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);  
}


D
dapan1121 已提交
1461
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
1462 1463
  int32_t code = 0;
  
D
dapan1121 已提交
1464
  if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) {
D
dapan1121 已提交
1465 1466 1467
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1468
  if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1469
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
D
dapan1121 已提交
1470
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1471 1472
  }

D
dapan1121 已提交
1473
  CTG_ERR_JRET(ctgInitDBCache(pCatalog));
D
dapan1121 已提交
1474 1475

  bool newAdded = false;
D
dapan1121 已提交
1476
  SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};
D
dapan1121 已提交
1477

D
dapan1121 已提交
1478 1479 1480 1481 1482 1483
  SCtgDBCache *dbCache = NULL;
  CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, dbFName, dbId, &dbCache));
  
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
  if (dbCache->deleted) {
    ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbInfo->dbId);
D
dapan1121 已提交
1484 1485
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
1486 1487 1488 1489 1490
    CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED);
  }
  
  if (NULL == dbCache->vgInfo) {
    dbCache->vgInfo = dbInfo;
D
dapan1121 已提交
1491
  } else {
D
dapan1121 已提交
1492 1493 1494 1495 1496 1497 1498
    if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
      ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, current:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
      CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
      taosHashRelease(pCatalog->dbCache, dbCache);
      
      goto _return;
    }
D
dapan1121 已提交
1499
    
D
dapan1121 已提交
1500 1501 1502 1503
    if (dbCache->vgInfo->vgHash) {
      ctgInfo("cleanup db vgHash, dbFName:%s", dbFName);
      taosHashCleanup(dbCache->vgInfo->vgHash);
      dbCache->vgInfo->vgHash = NULL;
D
dapan1121 已提交
1504 1505
    }

D
dapan1121 已提交
1506 1507
    tfree(dbCache->vgInfo);
    dbCache->vgInfo = dbInfo;
D
dapan1121 已提交
1508 1509
  }

D
dapan1121 已提交
1510
  dbInfo = NULL;
D
dapan1121 已提交
1511

D
dapan1121 已提交
1512 1513 1514 1515 1516
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
  taosHashRelease(pCatalog->dbCache, dbCache);

  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
D
dapan1121 已提交
1517

D
dapan1121 已提交
1518
  ctgDebug("dbCache updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
D
dapan1121 已提交
1519 1520 1521

_return:

D
dapan1121 已提交
1522 1523 1524 1525
  if (dbInfo) {
    taosHashCleanup(dbInfo->vgHash);
    dbInfo->vgHash = NULL;
    tfree(dbInfo);
D
dapan1121 已提交
1526
  }
D
dapan1121 已提交
1527 1528 1529 1530 1531
  
  CTG_RET(code);
}


D
dapan1121 已提交
1532
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId) {
D
dapan1121 已提交
1533 1534
  int32_t code = 0;
  
D
dapan1121 已提交
1535
  if (NULL == pCatalog || NULL == dbFName) {
D
dapan1121 已提交
1536
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1537 1538
  }

D
dapan1121 已提交
1539
  if (NULL == pCatalog->dbCache) {
D
dapan1121 已提交
1540
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1541
  }
D
dapan1121 已提交
1542 1543 1544 1545

  SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
  if (NULL == dbCache) {
    ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName);
D
dapan1121 已提交
1546
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1547
  }
D
dapan1121 已提交
1548

D
dapan1121 已提交
1549 1550 1551 1552
  if (dbCache->dbId != dbId) {
    ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1553
  
D
dapan1121 已提交
1554 1555 1556
  CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));

_return:
D
dapan1121 已提交
1557
  
D
dapan1121 已提交
1558 1559
  taosHashRelease(pCatalog->dbCache, dbCache);

D
dapan1121 已提交
1560
  CTG_RET(code);
D
dapan1121 已提交
1561 1562
}

D
dapan 已提交
1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) {
  int32_t code = 0;
  bool removed = false;
  
  if (NULL == pCatalog || NULL == dbName || NULL == stbName) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == pCatalog->dbCache) {
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed));
  if (!removed) {
    return TSDB_CODE_SUCCESS;
  }
  
  ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);

  CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
  
  ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
  
  CTG_RET(code);
}

D
dapan1121 已提交
1589

H
Haojun Liao 已提交
1590
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
1591
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
D
dapan1121 已提交
1592
}
D
dapan1121 已提交
1593

D
dapan1121 已提交
1594
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
1595
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1);
D
dapan1121 已提交
1596 1597
}

D
dapan1121 已提交
1598 1599 1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) {
  STableMetaOutput output = {0};
  int32_t code = 0;

  strcpy(output.dbFName, rspMsg->dbFName);
  strcpy(output.tbName, rspMsg->tbName);
  
  SET_META_TYPE_TABLE(output.metaType);
  
  CTG_ERR_RET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta));

  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));

_return:

  tfree(output.tbMeta);
  
  CTG_RET(code);
}


D
dapan1121 已提交
1619
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
1620
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1621
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1622 1623
  }

D
dapan1121 已提交
1624
  return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
1625
}
1626

D
dapan1121 已提交
1627
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
1628
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable);
D
dapan1121 已提交
1629 1630
}

H
Haojun Liao 已提交
1631 1632
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
1633
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1634 1635 1636 1637 1638
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
1639
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
1640 1641 1642
  SArray *vgList = NULL;

  *pVgroupList = NULL;
D
dapan1121 已提交
1643
  
D
dapan1121 已提交
1644
  CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
D
dapan1121 已提交
1645

H
Haojun Liao 已提交
1646 1647
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
1648
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache));
D
dapan 已提交
1649

D
dapan 已提交
1650 1651 1652 1653 1654 1655 1656 1657 1658 1659
  // REMOEV THIS ....
  if (0 == tbMeta->vgId) {
    SVgroupInfo vgroup = {0};
    
    catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup);

    tbMeta->vgId = vgroup.vgId;
  }
  // REMOVE THIS ....

1660
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1661
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList));
D
dapan1121 已提交
1662
  } else {
1663
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
1664
    if (NULL == taosHashGetClone(dbCache->vgInfo->vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
1665
      ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan 已提交
1666
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
1667
    }
D
dapan1121 已提交
1668

D
dapan1121 已提交
1669 1670
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan1121 已提交
1671
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
D
dapan 已提交
1672 1673 1674
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
1675
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
1676
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan1121 已提交
1677 1678
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
1679

D
dapan1121 已提交
1680 1681 1682
    *pVgroupList = vgList;
    vgList = NULL;
  }
D
dapan 已提交
1683

D
dapan1121 已提交
1684 1685
_return:
  tfree(tbMeta);
D
dapan 已提交
1686

D
dapan1121 已提交
1687 1688 1689
  if (dbCache) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
1690 1691 1692 1693 1694 1695
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
1696
  
D
dapan1121 已提交
1697
  CTG_RET(code);
D
dapan1121 已提交
1698 1699 1700
}


H
Haojun Liao 已提交
1701
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
1702
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
1703 1704
  int32_t code = 0;

H
Haojun Liao 已提交
1705 1706
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
1707

D
dapan1121 已提交
1708
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache));
D
dapan1121 已提交
1709

D
dapan1121 已提交
1710
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup));
D
dapan1121 已提交
1711

D
dapan1121 已提交
1712
_return:
D
dapan1121 已提交
1713

D
dapan1121 已提交
1714 1715 1716
  if (dbCache) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);  
    taosHashRelease(pCatalog->dbCache, dbCache);
D
dapan1121 已提交
1717
  }
D
dapan1121 已提交
1718

D
dapan1121 已提交
1719
  CTG_RET(code);
D
dapan1121 已提交
1720 1721 1722
}


H
Haojun Liao 已提交
1723 1724
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
1725
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1726
  }
D
dapan1121 已提交
1727 1728 1729 1730 1731

  int32_t code = 0;

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
1732
    if (tbNum <= 0) {
D
dapan1121 已提交
1733
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
1734 1735
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
H
Haojun Liao 已提交
1736

D
dapan1121 已提交
1737 1738
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
1739
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
1740
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1741 1742 1743 1744 1745 1746
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
1747
      CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, name, false, &pTableMeta, -1));
D
dapan1121 已提交
1748 1749 1750 1751 1752 1753 1754 1755 1756 1757 1758 1759

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  
D
dapan1121 已提交
1760

D
dapan1121 已提交
1761 1762 1763 1764 1765 1766 1767 1768
  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
1769
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
1770
  }
D
dapan 已提交
1771
  
D
dapan1121 已提交
1772
  CTG_RET(code);
1773
}
D
dapan 已提交
1774

D
dapan1121 已提交
1775 1776
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
1777 1778 1779
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1780
  //TODO
D
dapan 已提交
1781 1782 1783 1784

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1785 1786 1787 1788 1789
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) {
  if (NULL == pCatalog || NULL == stables || NULL == num) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1790
  CTG_RET(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
D
dapan1121 已提交
1791 1792 1793 1794 1795 1796 1797 1798 1799 1800
}

int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
  if (NULL == pCatalog || NULL == dbs || NULL == num) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}

D
dapan 已提交
1801

D
dapan 已提交
1802
void catalogDestroy(void) {
D
dapan1121 已提交
1803 1804 1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816
  if (NULL == ctgMgmt.pCluster) {
    return;
  }

  SCatalog *pCatalog = NULL;
  void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL);
  while (pIter) {
    pCatalog = *(SCatalog **)pIter;

    if (pCatalog) {
      catalogFreeHandle(pCatalog);
    }
    
    pIter = taosHashIterate(ctgMgmt.pCluster, pIter);
D
dapan 已提交
1817
  }
D
dapan1121 已提交
1818 1819 1820
  
  taosHashCleanup(ctgMgmt.pCluster);
  ctgMgmt.pCluster = NULL;
D
dapan1121 已提交
1821 1822

  qInfo("catalog destroyed");
D
dapan 已提交
1823 1824 1825 1826
}