catalog.c 39.2 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

D
dapan1121 已提交
23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
D
dapan1121 已提交
24
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
25
    *inCache = false;
D
dapan1121 已提交
26
    ctgWarn("empty db cache, dbName:%s", dbName);
D
dapan1121 已提交
27 28 29
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
30
  SDBVgroupInfo *info = NULL;
D
dapan1121 已提交
31

D
dapan1121 已提交
32 33
  while (true) {
    info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
34

D
dapan1121 已提交
35 36
    if (NULL == info) {
      *inCache = false;
D
dapan1121 已提交
37
      ctgWarn("not in db vgroup cache, dbName:%s", dbName);
D
dapan1121 已提交
38 39 40 41 42 43 44 45 46 47 48 49 50
      return TSDB_CODE_SUCCESS;
    }

    CTG_LOCK(CTG_READ, &info->lock);
    if (NULL == info->vgInfo) {
      CTG_UNLOCK(CTG_READ, &info->lock);
      taosHashRelease(pCatalog->dbCache.cache, info);
      ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName);
      
      continue;
    }

    break;
D
dapan1121 已提交
51
  }
D
dapan1121 已提交
52

D
dapan1121 已提交
53 54
  *dbInfo = info;
  *inCache = true;
D
dapan1121 已提交
55 56

  ctgDebug("Got db vgroup from cache, dbName:%s", dbName);
D
dapan1121 已提交
57 58 59 60 61 62 63 64 65 66 67
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
68 69 70 71 72 73 74
  ctgDebug("try to get db vgroup from mnode, db:%s", input->db);

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
75
  
D
dapan1121 已提交
76
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
77
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
78
      .pCont   = msg,
D
dapan1121 已提交
79 80 81 82 83 84
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
85
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
86
    ctgError("error rsp for use db, code:%x, db:%s", rpcRsp.code, input->db);
D
dapan1121 已提交
87
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
88
  }
D
dapan1121 已提交
89

D
dapan1121 已提交
90 91 92 93 94
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
95

D
dapan1121 已提交
96 97
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
98 99


H
Haojun Liao 已提交
100
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
101 102
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
D
dapan1121 已提交
103
    ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);
D
dapan1121 已提交
104 105 106
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
107
  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
108
  tNameExtractFullName(pTableName, tbFullName);
D
dapan1121 已提交
109

D
dapan1121 已提交
110 111 112 113
  *pTableMeta = NULL;

  size_t sz = 0;
  STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
D
dapan1121 已提交
114

D
dapan1121 已提交
115
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
116
    *exist = 0;
D
dapan1121 已提交
117
    ctgDebug("tablemeta not in cache, tbName:%s", tbFullName);
D
dapan1121 已提交
118 119 120
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
121
  *exist = 1;
D
dapan1121 已提交
122 123
  
  tbMeta = *pTableMeta;
D
dapan1121 已提交
124

D
dapan1121 已提交
125
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
126 127
    ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);

D
dapan1121 已提交
128 129 130 131 132 133 134 135
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
  
  STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
  if (NULL == stbMeta || NULL == *stbMeta) {
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
136
    ctgError("stable not in stableCache, suid:%"PRIx64, tbMeta->suid);
D
dapan1121 已提交
137 138 139 140
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
141

D
dapan1121 已提交
142 143 144
  if ((*stbMeta)->suid != tbMeta->suid) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    tfree(*pTableMeta);
D
dapan1121 已提交
145
    ctgError("stable suid in stableCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
146 147
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
148

D
dapan1121 已提交
149 150 151 152
  int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
153
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
154
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
155 156
  }

D
dapan1121 已提交
157 158 159
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

  CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
160 161

  ctgDebug("Got tablemeta from cache, tbName:%s", tbFullName);
D
dapan1121 已提交
162 163 164 165
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
166 167
int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableName, int32_t *tbType) {
  if (NULL == pCatalog->tableCache.cache) {
D
dapan1121 已提交
168
    ctgWarn("empty tablemeta cache, tbName:%s", pTableName->tname);  
D
dapan1121 已提交
169 170 171 172 173 174 175 176 177 178 179 180
    return TSDB_CODE_SUCCESS;
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(pTableName, tbFullName);

  size_t sz = 0;
  STableMeta *pTableMeta = NULL;
  
  taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)&pTableMeta, &sz);

  if (NULL == pTableMeta) {
D
dapan1121 已提交
181 182
    ctgWarn("tablemeta not in cache, tbName:%s", tbFullName);  
  
D
dapan1121 已提交
183 184 185 186
    return TSDB_CODE_SUCCESS;
  }

  *tbType = pTableMeta->tableType;
D
dapan1121 已提交
187 188

  ctgDebug("Got tabletype from cache, tbName:%s, type:%d", tbFullName, *tbType);  
D
dapan1121 已提交
189 190 191 192 193
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
194 195 196 197 198 199 200 201 202 203
void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

D
dapan1121 已提交
204
int32_t ctgGetTableMetaFromMnodeImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, char* tbFullName, STableMetaOutput* output) {
D
dapan1121 已提交
205
  SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
D
dapan1121 已提交
206 207 208 209
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
210 211 212 213 214 215 216
  ctgDebug("try to get table meta from mnode, tbName:%s", tbFullName);

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
217 218 219 220 221 222

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
223

D
dapan1121 已提交
224 225 226 227 228
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
229 230
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
      output->metaNum = 0;
D
dapan1121 已提交
231
      ctgDebug("stablemeta not exist in mnode, tbName:%s", tbFullName);
D
dapan1121 已提交
232 233 234
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
235
    ctgError("error rsp for stablemeta from mnode, code:%x, tbName:%s", rpcRsp.code, tbFullName);
D
dapan1121 已提交
236 237 238
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
239 240 241 242 243 244 245
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process mnode stablemeta rsp failed, code:%x, tbName:%s", code, tbFullName);
    CTG_ERR_RET(code);
  }

  ctgDebug("Got table meta from mnode, tbName:%s", tbFullName);
D
dapan1121 已提交
246 247 248 249

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
250 251 252 253 254 255 256
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
  char tbFullName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(pTableName, tbFullName);

  return ctgGetTableMetaFromMnodeImpl(pCatalog, pRpc, pMgmtEps, tbFullName, output);
}

D
dapan1121 已提交
257

D
dapan1121 已提交
258 259
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
260
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
261 262
  }

D
dapan1121 已提交
263 264
  char dbFullName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFullName);
D
dapan1121 已提交
265

D
dapan1121 已提交
266 267
  ctgDebug("try to get table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname);

D
dapan1121 已提交
268
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = (char *)pTableName->tname};
D
dapan1121 已提交
269 270 271 272
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
273 274 275 276 277
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build vnode tablemeta msg failed, code:%x, tbName:%s", code, pTableName->tname);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
278 279

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
280
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
281 282 283 284 285 286 287 288 289
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);
  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
290
  
D
dapan1121 已提交
291
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
292 293
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
      output->metaNum = 0;
D
dapan1121 已提交
294
      ctgDebug("tablemeta not exist in vnode, tbName:%s", pTableName->tname);
D
dapan1121 已提交
295 296 297
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
298
    ctgError("error rsp for table meta from vnode, code:%x, tbName:%s", rpcRsp.code, pTableName->tname);
D
dapan1121 已提交
299
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
300 301
  }

D
dapan1121 已提交
302 303 304 305 306 307 308
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process vnode tablemeta rsp failed, code:%x, tbName:%s", code, pTableName->tname);
    CTG_ERR_RET(code);
  }

  ctgDebug("Got table meta from vnode, db:%s, tbName:%s", dbFullName, pTableName->tname);
D
dapan1121 已提交
309 310 311 312 313

  return TSDB_CODE_SUCCESS;
}


314 315
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
316 317 318 319 320 321 322 323
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
324
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
325
  SHashObj *vgroupHash = NULL;
326
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
327 328
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
329
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
330

D
dapan1121 已提交
331
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
332
  if (NULL == vgList) {
D
dapan1121 已提交
333
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
334 335 336
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

337 338 339
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
340

D
dapan1121 已提交
341
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
342
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
343
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
344 345 346 347
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
348 349
  }

D
dapan1121 已提交
350 351 352
  *vgroupList = vgList;
  vgList = NULL;

D
dapan1121 已提交
353 354
  ctgDebug("Got vg list from DB, vgNum:%d", vgNum);

D
dapan1121 已提交
355
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
356 357 358 359 360 361 362 363

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
364 365
}

D
dapan1121 已提交
366
int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
367 368
  int32_t code = 0;
  
369
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
H
Haojun Liao 已提交
370 371 372
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

373
  if (vgNum <= 0) {
D
dapan1121 已提交
374
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
375
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
376 377
  }

378 379
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
380

D
dapan1121 已提交
381
  CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
382 383

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
384
  tNameExtractFullName(pTableName, tbFullName);
385 386 387 388 389 390 391 392

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
393
    }
394 395 396
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
397 398
  }

399
  if (NULL == vgInfo) {
D
dapan1121 已提交
400
    ctgError("no hash range found for hashvalue[%u], db:%s", hashValue, db);
D
dapan1121 已提交
401
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
402 403 404 405
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
406 407 408
_return:
  
  CTG_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
409 410
}

D
dapan1121 已提交
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428

int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
  if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
    return -1;
  } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
  if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
    return -1;
  } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
429
  }
D
dapan1121 已提交
430 431 432 433 434 435 436 437 438
}


int32_t ctgMetaRentInit(SMetaRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

  size_t msgSize = sizeof(SRentSlotInfo) * mgmt->slotNum;
D
dapan1121 已提交
439
  
D
dapan1121 已提交
440 441 442 443 444
  mgmt->slots = calloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
    return TSDB_CODE_CTG_MEM_ERROR;
  }
D
dapan1121 已提交
445

D
dapan1121 已提交
446 447 448 449
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
450

D
dapan1121 已提交
451 452 453 454 455 456 457 458 459 460 461

int32_t ctgMetaRentAdd(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
  int16_t widx = id % mgmt->slotNum;

  SRentSlotInfo *slot = &mgmt->slots[widx];
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
D
dapan1121 已提交
462
      CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
463 464
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
465
    }
D
dapan1121 已提交
466
  }
D
dapan1121 已提交
467

D
dapan1121 已提交
468
  if (NULL == taosArrayPush(slot->meta, meta)) {
D
dapan1121 已提交
469
    CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
470 471
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
472 473
  }

D
dapan1121 已提交
474
  slot->needSort = true;
D
dapan1121 已提交
475

D
dapan1121 已提交
476
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
477

D
dapan1121 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490 491
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

int32_t ctgMetaRentUpdate(SMetaRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
  int16_t widx = id % mgmt->slotNum;

  SRentSlotInfo *slot = &mgmt->slots[widx];
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
492
    CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
493 494 495 496 497 498 499 500 501 502 503
    qError("meta in slot is empty, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
    qDebug("slot meta sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
D
dapan1121 已提交
504 505
  if (NULL == orig) {    
    CTG_UNLOCK(CTG_WRITE, &slot->lock);
D
dapan1121 已提交
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

int32_t ctgMetaRentGetImpl(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
531
  }
D
dapan1121 已提交
532 533 534

  SRentSlotInfo *slot = &mgmt->slots[ridx];
  int32_t code = 0;
D
dapan1121 已提交
535
  
D
dapan1121 已提交
536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = malloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

int32_t ctgMetaRentGet(SMetaRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
592 593 594 595
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
596

D
dapan1121 已提交
597
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
598 599
  int32_t code = 0;
  
D
dapan1121 已提交
600
  if (output->metaNum != 1 && output->metaNum != 2) {
D
dapan1121 已提交
601
    ctgError("invalid table meta number in meta rsp, num:%d", output->metaNum);
D
dapan1121 已提交
602
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
603 604 605
  }

  if (NULL == output->tbMeta) {
D
dapan1121 已提交
606
    ctgError("no valid table meta got from meta rsp, tbName:%s", output->tbFname);
D
dapan1121 已提交
607
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
608 609 610
  }

  if (NULL == pCatalog->tableCache.cache) {
D
dapan1121 已提交
611 612
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
D
dapan1121 已提交
613
      ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
614
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
615
    }
D
dapan1121 已提交
616 617 618 619

    if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.cache, NULL, cache)) {
      taosHashCleanup(cache);
    }
D
dapan1121 已提交
620
  }
D
dapan1121 已提交
621

D
dapan1121 已提交
622
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
623 624
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
D
dapan1121 已提交
625
      ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
626
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
627
    }
D
dapan1121 已提交
628 629 630 631

    if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->tableCache.stableCache, NULL, cache)) {
      taosHashCleanup(cache);
    }
D
dapan1121 已提交
632 633 634 635
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
D
dapan1121 已提交
636
      ctgError("taosHashPut ctablemeta to cache failed, ctbName:%s", output->ctbFname);
D
dapan1121 已提交
637
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
638 639
    }

D
dapan1121 已提交
640 641
    ctgDebug("update tablemeta to cache, tbName:%s", output->ctbFname);

D
dapan1121 已提交
642
    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
D
dapan1121 已提交
643
      ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
D
dapan1121 已提交
644
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
645 646 647
    }    
  }

D
dapan1121 已提交
648
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
D
dapan1121 已提交
649 650

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
651 652 653
    bool newAdded = false;
    SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion};
    
D
dapan1121 已提交
654 655 656
    CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
657
      ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
D
dapan1121 已提交
658
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
659 660 661
    }

    STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
D
dapan1121 已提交
662
    if (taosHashPutExt(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) {
D
dapan1121 已提交
663
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
664
      ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid);
D
dapan1121 已提交
665
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
666 667
    }
    CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
668 669 670 671 672 673 674 675

    ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid);

    if (newAdded) {
      CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
    } else {
      CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stableRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare));
    }
D
dapan1121 已提交
676 677
  } else {
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
D
dapan1121 已提交
678
      ctgError("taosHashPut tablemeta to cache failed, tbName:%s", output->tbFname);
D
dapan1121 已提交
679
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
680 681
    }
  }
D
dapan1121 已提交
682

D
dapan1121 已提交
683 684
  ctgDebug("update tablemeta to cache, tbName:%s", output->tbFname);

D
dapan1121 已提交
685
  CTG_RET(code);
D
dapan1121 已提交
686 687
}

D
dapan1121 已提交
688

D
dapan1121 已提交
689 690
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
  bool inCache = false;
D
dapan1121 已提交
691
  if (0 == forceUpdate) {
D
dapan1121 已提交
692
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
D
dapan1121 已提交
693

D
dapan1121 已提交
694
    if (inCache) {
D
dapan1121 已提交
695 696 697 698 699 700 701 702 703 704
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
705

D
dapan1121 已提交
706 707
  while (true) {
    CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
708

D
dapan1121 已提交
709
    CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
D
dapan1121 已提交
710

D
dapan1121 已提交
711 712 713
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));

    if (!inCache) {
D
dapan1121 已提交
714
      ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName);
D
dapan1121 已提交
715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
      continue;
    }

    break;
  }

  return TSDB_CODE_SUCCESS;
}


int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
  SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (oldInfo) {
    CTG_LOCK(CTG_WRITE, &oldInfo->lock);
    if (dbInfo->vgVersion <= oldInfo->vgVersion) {
D
dapan1121 已提交
730
      ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
D
dapan1121 已提交
731 732 733 734 735 736 737
      CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
      taosHashRelease(pCatalog->dbCache.cache, oldInfo);
      
      return TSDB_CODE_SUCCESS;
    }
    
    if (oldInfo->vgInfo) {
D
dapan1121 已提交
738
      ctgInfo("cleanup db vgInfo, db:%s", dbName);
D
dapan1121 已提交
739 740 741 742 743 744 745 746
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
    
    CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
  
    taosHashRelease(pCatalog->dbCache.cache, oldInfo);
  }
D
dapan1121 已提交
747 748 749 750

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
751 752 753 754 755 756 757 758 759 760 761 762 763 764 765
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));

  STableMetaOutput voutput = {0};
  STableMetaOutput moutput = {0};
  STableMetaOutput *output = &voutput;

  if (CTG_IS_STABLE(isSTable)) {
D
dapan1121 已提交
766 767
    ctgDebug("will renew table meta, supposed to be stable, tbName:%s", pTableName->tname);
    
D
dapan1121 已提交
768 769 770 771 772 773 774 775
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCatalog, pTransporter, pMgmtEps, pTableName, &moutput));

    if (0 == moutput.metaNum) {
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));
    } else {
      output = &moutput;
    }
  } else {
D
dapan1121 已提交
776 777
    ctgDebug("will renew table meta, not supposed to be stable, tbName:%s, isStable:%d", pTableName->tname, isSTable);
  
D
dapan1121 已提交
778 779 780
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &voutput));

    if (voutput.metaNum > 0 && TSDB_SUPER_TABLE == voutput.tbMeta->tableType) {
D
dapan1121 已提交
781 782
      ctgDebug("will continue to renew table meta since got stable, tbName:%s, metaNum:%d", pTableName->tname, voutput.metaNum);
      
D
dapan1121 已提交
783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCatalog, pTransporter, pMgmtEps, voutput.tbFname, &moutput));

      tfree(voutput.tbMeta);
      voutput.tbMeta = moutput.tbMeta;
      moutput.tbMeta = NULL;
    }
  }

  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, output));

_return:

  tfree(voutput.tbMeta);
  tfree(moutput.tbMeta);
  
  CTG_RET(code);
}

D
dapan1121 已提交
801

D
dapan1121 已提交
802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838
int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
  int32_t exist = 0;

  if (!forceUpdate) {
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));

    if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
      return TSDB_CODE_SUCCESS;
    }
  } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
    int32_t tbType = 0;
    
    CTG_ERR_RET(ctgGetTableTypeFromCache(pCatalog, pTableName, &tbType));

    CTG_SET_STABLE(isSTable, tbType);
  }

  CTG_ERR_RET(ctgRenewTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, isSTable));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));

  if (0 == exist) {
    ctgError("renew tablemeta succeed but get from cache failed, may be deleted, tbName:%s", pTableName->tname);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
  
  return TSDB_CODE_SUCCESS;
}

void ctgFreeHandle(struct SCatalog* pCatalog) {
  //TODO
}

D
dapan1121 已提交
839
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
840
  if (ctgMgmt.pCluster) {
D
dapan1121 已提交
841
    qError("catalog already init");
D
dapan1121 已提交
842
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
843 844 845 846
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
847

D
dapan1121 已提交
848 849 850 851 852 853 854
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
855 856 857 858 859 860 861 862

    if (ctgMgmt.cfg.dbRentSec == 0) {
      ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    }

    if (ctgMgmt.cfg.stableRentSec == 0) {
      ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
    }
D
dapan1121 已提交
863 864 865
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan1121 已提交
866 867
    ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    ctgMgmt.cfg.stableRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
868 869
  }

D
dapan1121 已提交
870
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
871
  if (NULL == ctgMgmt.pCluster) {
D
dapan1121 已提交
872 873
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
874 875
  }

D
dapan1121 已提交
876 877
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stableRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stableRentSec);

D
dapan 已提交
878
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
879 880
}

881 882
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
  if (NULL == catalogHandle) {
D
dapan1121 已提交
883
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
884 885 886
  }

  if (NULL == ctgMgmt.pCluster) {
D
dapan1121 已提交
887
    qError("cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
888
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
889 890
  }

D
dapan1121 已提交
891 892
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
893

D
dapan1121 已提交
894 895
  while (true) {
    SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
896

D
dapan1121 已提交
897 898 899 900 901
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
902

D
dapan1121 已提交
903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925
    clusterCtg = calloc(1, sizeof(SCatalog));
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stableRent, ctgMgmt.cfg.stableRentSec, CTG_RENT_STABLE));

    code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
926
  }
D
dapan1121 已提交
927 928

  *catalogHandle = clusterCtg;
D
dapan 已提交
929
  
D
dapan1121 已提交
930
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
931 932 933 934 935 936 937 938 939 940 941 942 943 944

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

void catalogFreeHandle(struct SCatalog* pCatalog) {
  if (NULL == pCatalog) {
    return;
  }
  
  ctgFreeHandle(pCatalog);
D
dapan 已提交
945 946
}

D
dapan1121 已提交
947 948
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
949
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
950 951 952 953
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
954
    ctgInfo("empty db cache, dbName:%s", dbName);
D
dapan1121 已提交
955 956 957
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
958
  SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
959 960
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
961
    ctgInfo("db not in cache, dbName:%s", dbName);
D
dapan1121 已提交
962 963 964
    return TSDB_CODE_SUCCESS;
  }

965
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
966
  taosHashRelease(pCatalog->dbCache.cache, dbInfo);
D
dapan1121 已提交
967

D
dapan1121 已提交
968 969
  ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version);

D
dapan1121 已提交
970 971 972
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
973 974 975 976 977 978 979 980 981 982 983 984 985 986
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SDBVgroupInfo* db = NULL;
  int32_t code = 0;
  SVgroupInfo *vgInfo = NULL;
  SArray *vgList = NULL;
  
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));

  vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
D
dapan1121 已提交
987
    ctgError("taosArrayInit %d failed", taosHashGetSize(db->vgInfo));
D
dapan1121 已提交
988 989 990 991 992 993 994 995
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
  }

  void *pIter = taosHashIterate(db->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
996
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(db->vgInfo, pIter);
    vgInfo = NULL;
  }

  *vgroupList = vgList;
  vgList = NULL;

_return:

  if (db) {
    CTG_UNLOCK(CTG_READ, &db->lock);
    taosHashRelease(pCatalog->dbCache.cache, db);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);  
}


D
dapan1121 已提交
1023
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
1024 1025
  int32_t code = 0;
  
D
dapan1121 已提交
1026
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
1027 1028 1029 1030
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
D
dapan1121 已提交
1031
    ctgError("invalid db vgInfo, dbName:%s, vgInfo:%p, vgVersion:%d", dbName, dbInfo->vgInfo, dbInfo->vgVersion);
D
dapan1121 已提交
1032
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1033 1034
  }

1035
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
1036
    ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion);
D
dapan1121 已提交
1037

D
dapan1121 已提交
1038
    if (pCatalog->dbCache.cache) {
D
dapan1121 已提交
1039 1040 1041
      CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
      
      CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
D
dapan1121 已提交
1042 1043
    }
    
D
dapan1121 已提交
1044
    ctgWarn("db removed from cache, db:%s", dbName);
D
dapan1121 已提交
1045
    goto _return;
D
dapan1121 已提交
1046
  }
D
dapan1121 已提交
1047

D
dapan1121 已提交
1048
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
1049 1050
    SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == cache) {
D
dapan1121 已提交
1051
      ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
1052
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1053
    }
D
dapan1121 已提交
1054 1055 1056 1057

    if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache.cache, NULL, cache)) {
      taosHashCleanup(cache);
    }
1058
  } else {
D
dapan1121 已提交
1059
    CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
D
dapan1121 已提交
1060 1061
  }

D
dapan1121 已提交
1062 1063 1064
  bool newAdded = false;
  if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
    ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
D
dapan1121 已提交
1065
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1066 1067
  }

D
dapan1121 已提交
1068 1069 1070 1071 1072 1073 1074 1075 1076
  dbInfo->vgInfo = NULL;

  SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
  if (newAdded) {
    CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
  } else {
    CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
  }
  
D
dapan1121 已提交
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087
  ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);


_return:

  if (dbInfo && dbInfo->vgInfo) {
    taosHashCleanup(dbInfo->vgInfo);
    dbInfo->vgInfo = NULL;
  }
  
  CTG_RET(code);
D
dapan1121 已提交
1088 1089
}

H
Haojun Liao 已提交
1090
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
1091
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
D
dapan1121 已提交
1092
}
D
dapan1121 已提交
1093

D
dapan1121 已提交
1094
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
1095
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1);
D
dapan1121 已提交
1096 1097 1098
}

int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
1099
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1100
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1101 1102
  }

D
dapan1121 已提交
1103
  return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable);
1104
}
1105

D
dapan1121 已提交
1106
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
1107
  return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable);
D
dapan1121 已提交
1108 1109
}

H
Haojun Liao 已提交
1110 1111
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
1112
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1113 1114 1115 1116 1117
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
1118 1119 1120 1121
  SDBVgroupInfo* dbVgroup = NULL;
  SArray *vgList = NULL;

  *pVgroupList = NULL;
D
dapan1121 已提交
1122
  
D
dapan1121 已提交
1123
  CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
D
dapan1121 已提交
1124

H
Haojun Liao 已提交
1125 1126
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
H
Haojun Liao 已提交
1127
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
D
dapan 已提交
1128

1129
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
1130
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
D
dapan1121 已提交
1131
  } else {
1132
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
1133
    if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
D
dapan1121 已提交
1134
      ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, pTableName->tname);
D
dapan 已提交
1135
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
1136
    }
D
dapan1121 已提交
1137

D
dapan1121 已提交
1138 1139
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan1121 已提交
1140
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
D
dapan 已提交
1141 1142 1143
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
1144
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
D
dapan1121 已提交
1145
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, pTableName->tname);
D
dapan1121 已提交
1146 1147
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
1148

D
dapan1121 已提交
1149 1150 1151
    *pVgroupList = vgList;
    vgList = NULL;
  }
D
dapan 已提交
1152

D
dapan1121 已提交
1153 1154
_return:
  tfree(tbMeta);
D
dapan 已提交
1155

D
dapan1121 已提交
1156 1157 1158 1159 1160 1161 1162 1163 1164
  if (dbVgroup) {
    CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
    taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
1165
  
D
dapan1121 已提交
1166
  CTG_RET(code);
D
dapan1121 已提交
1167 1168 1169
}


H
Haojun Liao 已提交
1170
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
1171
  SDBVgroupInfo* dbInfo = NULL;
D
dapan1121 已提交
1172 1173
  int32_t code = 0;

H
Haojun Liao 已提交
1174 1175
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
1176

H
Haojun Liao 已提交
1177
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
D
dapan1121 已提交
1178

D
dapan1121 已提交
1179
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbInfo, pTableName, pVgroup));
D
dapan1121 已提交
1180

D
dapan1121 已提交
1181 1182 1183 1184 1185 1186
_return:

  if (dbInfo) {
    CTG_UNLOCK(CTG_READ, &dbInfo->lock);  
    taosHashRelease(pCatalog->dbCache.cache, dbInfo);
  }
D
dapan1121 已提交
1187

D
dapan1121 已提交
1188
  CTG_RET(code);
D
dapan1121 已提交
1189 1190 1191
}


D
dapan1121 已提交
1192 1193
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
1194
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1195
  }
D
dapan1121 已提交
1196 1197 1198 1199 1200

  int32_t code = 0;

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
1201
    if (tbNum <= 0) {
D
dapan1121 已提交
1202
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
1203 1204
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
H
Haojun Liao 已提交
1205

D
dapan1121 已提交
1206 1207
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
1208
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
1209
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1210 1211 1212 1213 1214 1215
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
D
dapan1121 已提交
1216
      CTG_ERR_JRET(ctgGetTableMeta(pCatalog, pRpc, pMgmtEps, name, false, &pTableMeta, -1));
D
dapan1121 已提交
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
1238
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
1239
  }
D
dapan 已提交
1240
  
D
dapan1121 已提交
1241
  CTG_RET(code);
1242
}
D
dapan 已提交
1243

D
dapan1121 已提交
1244 1245
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
1246 1247 1248
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1249
  //TODO
D
dapan 已提交
1250 1251 1252 1253

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) {
  if (NULL == pCatalog || NULL == stables || NULL == num) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_RET(ctgMetaRentGet(&pCatalog->stableRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) {
  if (NULL == pCatalog || NULL == dbs || NULL == num) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
}

D
dapan 已提交
1270

D
dapan 已提交
1271 1272 1273 1274 1275
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
D
dapan1121 已提交
1276 1277

  qInfo("catalog destroyed");
D
dapan 已提交
1278 1279 1280 1281
}