catalog.c 39.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

D
dapan1121 已提交
23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
D
dapan1121 已提交
24
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
25
    *inCache = false;
D
dapan1121 已提交
26
    ctgWarn("empty db cache, dbName:%s", dbName);
D
dapan1121 已提交
27 28 29
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
30
  SDBVgroupInfo *info = NULL;
D
dapan1121 已提交
31

D
dapan1121 已提交
32 33
  while (true) {
    info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
34

D
dapan1121 已提交
35 36
    if (NULL == info) {
      *inCache = false;
D
dapan1121 已提交
37
      ctgWarn("not in db vgroup cache, dbName:%s", dbName);
D
dapan1121 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
      return TSDB_CODE_SUCCESS;
    }

    CTG_LOCK(CTG_READ, &info->lock);
    if (NULL == info->vgInfo) {
      CTG_UNLOCK(CTG_READ, &info->lock);
      taosHashRelease(pCatalog->dbCache.cache, info);
      ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName);
      
      continue;
    }

    break;
D
dapan1121 已提交
51
  }
D
dapan1121 已提交
52

D
dapan1121 已提交
53 54
  *dbInfo = info;
  *inCache = true;
D
dapan1121 已提交
55 56

  ctgDebug("Got db vgroup from cache, dbName:%s", dbName);
D
dapan1121 已提交
57 58 59 60 61 62 63 64 65 66 67
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
68 69 70 71 72 73 74
  ctgDebug("try to get db vgroup from mnode, db:%s", input->db);

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
75
  
D
dapan1121 已提交
76
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
77
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
78
      .pCont   = msg,
D
dapan1121 已提交
79 80 81 82 83 84
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
85
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
86
    ctgError("error rsp for use db, code:%x, db:%s", rpcRsp.code, input->db);
D
dapan1121 已提交
87
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
88
  }
D
dapan1121 已提交
89

D
dapan1121 已提交
90 91 92 93 94
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
95

D
dapan1121 已提交
96 97
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
98 99


H
Haojun Liao 已提交
100
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
101 102
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
D
dapan1121 已提交
103
    ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);
D
dapan1121 已提交
104 105 106
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
107
  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
108
  tNameExtractFullName(pTableName, tbFullName);
D
dapan1121 已提交
109

D
dapan1121 已提交
110 111 112 113
  *pTableMeta = NULL;

  size_t sz = 0;
  STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
D
dapan1121 已提交
114

D
dapan1121 已提交
115
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
116
    *exist = 0;
D
dapan1121 已提交
117
    ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
D
dapan1121 已提交
118 119 120
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
121
  *exist = 1;
D
dapan1121 已提交
122

D
dapan1121 已提交
123
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
124 125
    ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);

D
dapan1121 已提交
126 127 128 129 130 131 132 133
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
  
  STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
  if (NULL == stbMeta || NULL == *stbMeta) {
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
134
    ctgError("stable not in stableCache, suid:%"PRIx64, tbMeta->suid);
D
dapan1121 已提交
135 136 137 138
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
139

D
dapan1121 已提交
140 141 142
  if ((*stbMeta)->suid != tbMeta->suid) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    tfree(*pTableMeta);
D
dapan1121 已提交
143
    ctgError("stable suid in stableCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
144 145
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
146

D
dapan1121 已提交
147 148 149 150
  int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
151
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
152
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
153 154
  }

D
dapan1121 已提交
155 156 157
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

  CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
158 159

  ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
D
dapan1121 已提交
160 161 162 163
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
164 165
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
  if (NULL == pCatalog->tableCache.cache) {
D
dapan1121 已提交
166
    ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);  
D
dapan1121 已提交
167 168 169 170 171 172 173 174 175 176 177 178
    return TSDB_CODE_SUCCESS;
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(pTableName, tbFullName);

  size_t sz = 0;
  STableMeta *pTableMeta = NULL;
  
  taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz);

  if (NULL == pTableMeta) {
D
dapan1121 已提交
179 180
    ctgWarn("tablemeta not in cache, tbName:%s", tbFullName);  
  
D
dapan1121 已提交
181 182 183 184
    return TSDB_CODE_SUCCESS;
  }

  *tbType = pTableMeta->tableType;
D
dapan1121 已提交
185 186

  ctgDebug("Got tabletype from cache, tbName:%s, type:%d", tbFullName, *tbType);  
D
dapan1121 已提交
187 188 189 190 191
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
192 193 194 195 196 197 198 199 200 201
void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

D
dapan1121 已提交
202 203 204 205 206 207 208 209
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(pTableName, tbFullName);

D
dapan1121 已提交
210
  SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
D
dapan1121 已提交
211 212 213 214
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
215 216 217 218 219 220 221
  ctgDebug("try to get table meta from mnode, tbName:%s", tbFullName);

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
222 223 224 225 226 227

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
228

D
dapan1121 已提交
229 230 231 232 233
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
234 235
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
      output->metaNum = 0;
D
dapan1121 已提交
236
      ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
D
dapan1121 已提交
237 238 239
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
240
    ctgError("error rsp for stablemeta from mnode, code:%x, tbName:%s", rpcRsp.code, tbFullName);
D
dapan1121 已提交
241 242 243
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
244 245 246 247 248 249 250
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process mnode stablemeta rsp failed, code:%x, tbName:%s", code, tbFullName);
    CTG_ERR_RET(code);
  }

  ctgDebug("Got table meta from mnode, tbName:%s", tbFullName);
D
dapan1121 已提交
251 252 253 254 255

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
256 257
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
258
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
259 260
  }

D
dapan1121 已提交
261 262
  char dbFullName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFullName);
D
dapan1121 已提交
263

D
dapan1121 已提交
264 265
  ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname);

D
dapan1121 已提交
266
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname};
D
dapan1121 已提交
267 268 269 270
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
271 272 273 274 275
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
276 277

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
278
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
279 280 281 282 283 284 285 286 287
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);
  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
288
  
D
dapan1121 已提交
289
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
290 291
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
      output->metaNum = 0;
D
dapan1121 已提交
292
      ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname);
D
dapan1121 已提交
293 294 295
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
296
    ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname);
D
dapan1121 已提交
297
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
298 299
  }

D
dapan1121 已提交
300 301 302 303 304 305 306
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname);
    CTG_ERR_RET(code);
  }

  ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname);
D
dapan1121 已提交
307 308 309 310 311

  return TSDB_CODE_SUCCESS;
}


312 313
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
314 315 316 317 318 319 320 321
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
322
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
323
  SHashObj *vgroupHash = NULL;
324
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
325 326
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
327
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
328

D
dapan1121 已提交
329
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
330
  if (NULL == vgList) {
D
dapan1121 已提交
331
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
332 333 334
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

335 336 337
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
338

D
dapan1121 已提交
339
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
340
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
341
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
342 343 344 345
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
346 347
  }

D
dapan1121 已提交
348 349 350
  *vgroupList = vgList;
  vgList = NULL;

D
dapan1121 已提交
351 352
  ctgDebug("Got vg list from DB, vgNum:%d", vgNum);

D
dapan1121 已提交
353
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
354 355 356 357 358 359 360 361

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
362 363
}

D
dapan1121 已提交
364
int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
365 366
  int32_t code = 0;
  
367
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
H
Haojun Liao 已提交
368 369 370
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

371
  if (vgNum <= 0) {
D
dapan1121 已提交
372
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
373
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
374 375
  }

376 377
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
378

D
dapan1121 已提交
379
  CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
380 381

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
382
  tNameExtractFullName(pTableName, tbFullName);
383 384 385 386 387 388 389 390

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
391
    }
392 393 394
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
395 396
  }

397
  if (NULL == vgInfo) {
D
dapan1121 已提交
398
    ctgError("no hash range found for hashvalue[%u], db:%s", hashValue, db);
D
dapan1121 已提交
399
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
400 401 402 403
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
404 405 406
_return:
  
  CTG_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
407 408
}

D
dapan1121 已提交
409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
  if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
    return -1;
  } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
  if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
    return -1;
  } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
426
  }
D
dapan1121 已提交
427 428 429 430 431 432 433 434 435
}


int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SRentSlotInfo) * mgmt->slotNum;
D
dapan1121 已提交
436
  
D
dapan1121 已提交
437 438 439 440 441
  mgmt->slots = calloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
    return TSDB_CODE_CTG_MEM_ERROR;
  }
D
dapan1121 已提交
442

D
dapan1121 已提交
443 444 445 446
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
447

D
dapan1121 已提交
448 449

int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
D
dapan1121 已提交
450
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
451 452 453 454 455 456 457 458

  SRentSlotInfo *slot = &mgmt->slots[widx];
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
D
dapan1121 已提交
459
      CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
460 461
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
462
    }
D
dapan1121 已提交
463
  }
D
dapan1121 已提交
464

D
dapan1121 已提交
465
  if (NULL == taosArrayPush(slot->meta, meta)) {
D
dapan1121 已提交
466
    CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
467 468
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
469 470
  }

D
dapan1121 已提交
471
  slot->needSort = true;
D
dapan1121 已提交
472

D
dapan1121 已提交
473
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
474

D
dapan1121 已提交
475 476 477 478 479 480 481
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
D
dapan1121 已提交
482
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
483 484 485 486 487 488

  SRentSlotInfo *slot = &mgmt->slots[widx];
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
489
    CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
490 491 492 493 494 495 496 497 498 499 500
    qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
    qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
D
dapan1121 已提交
501 502
  if (NULL == orig) {    
    CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
528
  }
D
dapan1121 已提交
529 530 531

  SRentSlotInfo *slot = &mgmt->slots[ridx];
  int32_t code = 0;
D
dapan1121 已提交
532
  
D
dapan1121 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = malloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
589 590 591 592
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
593

D
dapan1121 已提交
594
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
595 596
  int32_t code = 0;
  
D
dapan1121 已提交
597
  if (output->metaNum != 1 && output->metaNum != 2) {
D
dapan1121 已提交
598
    ctgError("invalid table meta number in meta rsp, num:%d", output->metaNum);
D
dapan1121 已提交
599
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
600 601 602
  }

  if (NULL == output->tbMeta) {
D
dapan1121 已提交
603
    ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
D
dapan1121 已提交
604
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
605 606 607
  }

  if (NULL == pCatalog->tableCache.cache) {
D
dapan1121 已提交
608 609
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
D
dapan1121 已提交
610
      ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
611
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
612
    }
D
dapan1121 已提交
613
  }
D
dapan1121 已提交
614

D
dapan1121 已提交
615
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
616 617
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
D
dapan1121 已提交
618
      ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
619
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
620 621 622 623 624
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
D
dapan1121 已提交
625
      ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname);
D
dapan1121 已提交
626
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
627 628
    }

D
dapan1121 已提交
629 630
    ctgDebug("update tablemeta to cache, tbName:%s", output->ctbFname);

D
dapan1121 已提交
631
    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
D
dapan1121 已提交
632
      ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
D
dapan1121 已提交
633
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
634 635 636
    }    
  }

D
dapan1121 已提交
637
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
D
dapan1121 已提交
638 639

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
640 641 642
    bool newAdded = false;
    SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion};
    
D
dapan1121 已提交
643 644 645
    CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
646
      ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
D
dapan1121 已提交
647
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
648 649 650
    }

    STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
D
dapan1121 已提交
651
    if (taosHashPutExt(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
D
dapan1121 已提交
652
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
653
      ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid);
D
dapan1121 已提交
654
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
655 656
    }
    CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
657 658 659 660 661 662 663 664

    ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid);

    if (newAdded) {
      CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
    } else {
      CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
    }
D
dapan1121 已提交
665 666
  } else {
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
D
dapan1121 已提交
667
      ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
D
dapan1121 已提交
668
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
669 670
    }
  }
D
dapan1121 已提交
671

D
dapan1121 已提交
672 673
  ctgDebug("update tablemeta to cache, tbName:%s", output->tbFname);

D
dapan1121 已提交
674
  CTG_RET(code);
D
dapan1121 已提交
675 676
}

D
dapan1121 已提交
677

D
dapan1121 已提交
678 679
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
  bool inCache = false;
D
dapan1121 已提交
680
  if (0 == forceUpdate) {
D
dapan1121 已提交
681
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
D
dapan1121 已提交
682

D
dapan1121 已提交
683
    if (inCache) {
D
dapan1121 已提交
684 685 686 687 688 689 690 691 692 693
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
694

D
dapan1121 已提交
695 696
  while (true) {
    CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
697

D
dapan1121 已提交
698
    CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
D
dapan1121 已提交
699

D
dapan1121 已提交
700 701 702
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));

    if (!inCache) {
D
dapan1121 已提交
703
      ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName);
D
dapan1121 已提交
704 705 706 707 708 709 710 711 712 713 714 715 716 717 718
      continue;
    }

    break;
  }

  return TSDB_CODE_SUCCESS;
}


int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
  SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (oldInfo) {
    CTG_LOCK(CTG_WRITE, &oldInfo->lock);
    if (dbInfo->vgVersion <= oldInfo->vgVersion) {
D
dapan1121 已提交
719
      ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
D
dapan1121 已提交
720 721 722 723 724 725 726
      CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
      taosHashRelease(pCatalog->dbCache.cache, oldInfo);
      
      return TSDB_CODE_SUCCESS;
    }
    
    if (oldInfo->vgInfo) {
D
dapan1121 已提交
727
      ctgInfo("cleanup db vgInfo, db:%s", dbName);
D
dapan1121 已提交
728 729 730 731 732 733 734 735
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
    
    CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
  
    taosHashRelease(pCatalog->dbCache.cache, oldInfo);
  }
D
dapan1121 已提交
736 737 738 739

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
740 741 742 743 744 745 746 747 748 749 750 751 752 753 754
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));

  STableMetaOutput voutput = {0};
  STableMetaOutput moutput = {0};
  STableMetaOutput *output = &voutput;

  if (CTG_IS_STABLE(isSTable)) {
D
dapan1121 已提交
755 756
    ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname);
    
D
dapan1121 已提交
757 758 759 760 761 762 763 764
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));

    if (0 == moutput.metaNum) {
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
    } else {
      output = &moutput;
    }
  } else {
D
dapan1121 已提交
765 766
    ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable);
  
D
dapan1121 已提交
767 768 769
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));

    if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
D
dapan1121 已提交
770 771
      ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaNum:%d", pTableName->tname, voutput.metaNum);
      
D
dapan1121 已提交
772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));

      tfree(voutput.tbMeta);
      voutput.tbMeta = moutput.tbMeta;
      moutput.tbMeta = NULL;
    }
  }

  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));

_return:

  tfree(voutput.tbMeta);
  tfree(moutput.tbMeta);
  
  CTG_RET(code);
}

D
dapan1121 已提交
790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
  int32_t exist = 0;

  if (!forceUpdate) {
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));

    if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
      return TSDB_CODE_SUCCESS;
    }
  } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
    int32_t tbType = 0;
    
    CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType));

    CTG_SET_STABLE(isSTable, tbType);
  }

  CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));

  if (0 == exist) {
    ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
  
  return TSDB_CODE_SUCCESS;
}

void ctgFreeHandle(struct SCatalog* pCatalog) {
  //TODO
}

D
dapan1121 已提交
827
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
828
  if (ctgMgmt.pCluster) {
D
dapan1121 已提交
829
    qError("catalog already init");
D
dapan1121 已提交
830
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
831 832 833 834
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
835

D
dapan1121 已提交
836 837 838 839 840 841 842
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
843 844 845 846 847 848 849 850

    if (ctgMgmt.cfg.dbRentSec == 0) {
      ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    }

    if (ctgMgmt.cfg.stableRentSec == 0) {
      ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
    }
D
dapan1121 已提交
851 852 853
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan1121 已提交
854 855
    ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
856 857
  }

D
dapan1121 已提交
858
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
859
  if (NULL == ctgMgmt.pCluster) {
D
dapan1121 已提交
860 861
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
862 863
  }

D
dapan1121 已提交
864 865
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stableRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stableRentSec);

D
dapan 已提交
866
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
867 868
}

869 870
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
  if (NULL == catalogHandle) {
D
dapan1121 已提交
871
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
872 873 874
  }

  if (NULL == ctgMgmt.pCluster) {
D
dapan1121 已提交
875
    qError("cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
876
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
877 878
  }

D
dapan1121 已提交
879 880
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
881

D
dapan1121 已提交
882 883
  while (true) {
    SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
884

D
dapan1121 已提交
885 886 887 888 889
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
890

D
dapan1121 已提交
891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913
    clusterCtg = calloc(1, sizeof(SCatalog));
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE));

    code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
914
  }
D
dapan1121 已提交
915 916

  *catalogHandle = clusterCtg;
D
dapan 已提交
917
  
D
dapan1121 已提交
918
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
919 920 921 922 923 924 925 926 927 928 929 930 931 932

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

void catalogFreeHandle(struct SCatalog* pCatalog) {
  if (NULL == pCatalog) {
    return;
  }
  
  ctgFreeHandle(pCatalog);
D
dapan 已提交
933 934
}

D
dapan1121 已提交
935 936
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
937
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
938 939 940 941
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
942
    ctgInfo("empty db cache, dbName:%s", dbName);
D
dapan1121 已提交
943 944 945
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
946
  SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
947 948
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
949
    ctgInfo("db not in cache, dbName:%s", dbName);
D
dapan1121 已提交
950 951 952
    return TSDB_CODE_SUCCESS;
  }

953
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
954
  taosHashRelease(pCatalog->dbCache.cache, dbInfo);
D
dapan1121 已提交
955

D
dapan1121 已提交
956 957
  ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);

D
dapan1121 已提交
958 959 960
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
961 962 963 964 965 966 967 968 969 970 971 972 973 974
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SDBVgroupInfo* db = NULL;
  int32_t code = 0;
  SVgroupInfo *vgInfo = NULL;
  SArray *vgList = NULL;
  
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));

  vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
D
dapan1121 已提交
975
    ctgError("taosArrayInit %d failed", taosHashGetSize(db->vgInfo));
D
dapan1121 已提交
976 977 978 979 980 981 982 983
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
  }

  void *pIter = taosHashIterate(db->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
984
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(db->vgInfo, pIter);
    vgInfo = NULL;
  }

  *vgroupList = vgList;
  vgList = NULL;

_return:

  if (db) {
    CTG_UNLOCK(CTG_READ, &db->lock);
    taosHashRelease(pCatalog->dbCache.cache, db);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);  
}


D
dapan1121 已提交
1011
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
1012 1013
  int32_t code = 0;
  
D
dapan1121 已提交
1014
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
1015 1016 1017 1018
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
D
dapan1121 已提交
1019
    ctgError("invalid db vgInfo, dbName:%s, vgInfo:%p, vgVersion:%d", dbName, dbInfo->vgInfo, dbInfo->vgVersion);
D
dapan1121 已提交
1020
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1021 1022
  }

1023
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
1024
    ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion);
D
dapan1121 已提交
1025

D
dapan1121 已提交
1026
    if (pCatalog->dbCache.cache) {
D
dapan1121 已提交
1027 1028 1029
      CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
      
      CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
D
dapan1121 已提交
1030 1031
    }
    
D
dapan1121 已提交
1032
    ctgWarn("db removed from cache, db:%s", dbName);
D
dapan1121 已提交
1033
    goto _return;
D
dapan1121 已提交
1034
  }
D
dapan1121 已提交
1035

D
dapan1121 已提交
1036
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
1037 1038
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
D
dapan1121 已提交
1039
      ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
1040
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1041
    }
1042
  } else {
D
dapan1121 已提交
1043
    CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
D
dapan1121 已提交
1044 1045
  }

D
dapan1121 已提交
1046 1047 1048
  bool newAdded = false;
  if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
    ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
D
dapan1121 已提交
1049
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1050 1051
  }

D
dapan1121 已提交
1052 1053 1054 1055 1056 1057 1058 1059 1060
  dbInfo->vgInfo = NULL;

  SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
  if (newAdded) {
    CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
  } else {
    CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
  }
  
D
dapan1121 已提交
1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
  ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);


_return:

  if (dbInfo && dbInfo->vgInfo) {
    taosHashCleanup(dbInfo->vgInfo);
    dbInfo->vgInfo = NULL;
  }
  
  CTG_RET(code);
D
dapan1121 已提交
1072 1073
}

H
Haojun Liao 已提交
1074
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
1075
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
D
dapan1121 已提交
1076
}
D
dapan1121 已提交
1077

D
dapan1121 已提交
1078
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
1079
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1);
D
dapan1121 已提交
1080 1081 1082
}

int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
1083
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1084
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1085 1086
  }

D
dapan1121 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102
  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));

  STableMetaOutput output = {0};
  
  CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));

  //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));

  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));

_return:
  tfree(output.tbMeta);
  CTG_RET(code);
1103
}
1104

D
dapan1121 已提交
1105
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
1106
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable);
D
dapan1121 已提交
1107 1108
}

H
Haojun Liao 已提交
1109 1110
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
1111
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1112 1113 1114 1115 1116
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
1117 1118 1119 1120
  SDBVgroupInfo* dbVgroup = NULL;
  SArray *vgList = NULL;

  *pVgroupList = NULL;
D
dapan1121 已提交
1121
  
D
dapan1121 已提交
1122
  CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
D
dapan1121 已提交
1123

H
Haojun Liao 已提交
1124 1125
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
H
Haojun Liao 已提交
1126
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
D
dapan 已提交
1127

1128
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1129
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
D
dapan1121 已提交
1130
  } else {
1131
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
1132
    if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
D
dapan1121 已提交
1133
      ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname);
D
dapan 已提交
1134
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
1135
    }
D
dapan1121 已提交
1136

D
dapan1121 已提交
1137 1138
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan1121 已提交
1139
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
D
dapan 已提交
1140 1141 1142
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
1143
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
D
dapan1121 已提交
1144
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname);
D
dapan1121 已提交
1145 1146
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
1147

D
dapan1121 已提交
1148 1149 1150
    *pVgroupList = vgList;
    vgList = NULL;
  }
D
dapan 已提交
1151

D
dapan1121 已提交
1152 1153
_return:
  tfree(tbMeta);
D
dapan 已提交
1154

D
dapan1121 已提交
1155 1156 1157 1158 1159 1160 1161 1162 1163
  if (dbVgroup) {
    CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
    taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
1164
  
D
dapan1121 已提交
1165
  CTG_RET(code);
D
dapan1121 已提交
1166 1167 1168
}


H
Haojun Liao 已提交
1169
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
1170
  SDBVgroupInfo* dbInfo = NULL;
D
dapan1121 已提交
1171 1172
  int32_t code = 0;

H
Haojun Liao 已提交
1173 1174
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
1175

H
Haojun Liao 已提交
1176
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
D
dapan1121 已提交
1177

D
dapan1121 已提交
1178
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup));
D
dapan1121 已提交
1179

D
dapan1121 已提交
1180 1181 1182 1183 1184 1185
_return:

  if (dbInfo) {
    CTG_UNLOCK(CTG_READ, &dbInfo->lock);  
    taosHashRelease(pCatalog->dbCache.cache, dbInfo);
  }
D
dapan1121 已提交
1186

D
dapan1121 已提交
1187
  CTG_RET(code);
D
dapan1121 已提交
1188 1189 1190
}


D
dapan1121 已提交
1191 1192
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
1193
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1194
  }
D
dapan1121 已提交
1195 1196 1197 1198 1199

  int32_t code = 0;

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
1200
    if (tbNum <= 0) {
D
dapan1121 已提交
1201
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
1202 1203
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
H
Haojun Liao 已提交
1204

D
dapan1121 已提交
1205 1206
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
1207
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
1208
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1209 1210 1211 1212 1213 1214
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
D
dapan1121 已提交
1215
      CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, name, false, &pTableMeta, -1));
D
dapan1121 已提交
1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
1237
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
1238
  }
D
dapan 已提交
1239
  
D
dapan1121 已提交
1240
  CTG_RET(code);
1241
}
D
dapan 已提交
1242

D
dapan1121 已提交
1243 1244
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
1245 1246 1247
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1248
  //TODO
D
dapan 已提交
1249 1250 1251 1252

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) {
  if (NULL == pCatalog || NULL == stables || NULL == num) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_RET(ctgMetaRentGet(&pCatalog->stableRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
  if (NULL == pCatalog || NULL == dbs || NULL == num) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}

D
dapan 已提交
1269

D
dapan 已提交
1270 1271 1272 1273 1274
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
D
dapan1121 已提交
1275 1276

  qInfo("catalog destroyed");
D
dapan 已提交
1277 1278 1279 1280
}