catalog.c 25.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

D
dapan1121 已提交
23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
D
dapan1121 已提交
24
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
25
    *inCache = false;
D
dapan1121 已提交
26
    ctgWarn("no db cache");
D
dapan1121 已提交
27 28 29
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
30
  SDBVgroupInfo *info = NULL;
D
dapan1121 已提交
31

D
dapan1121 已提交
32 33
  while (true) {
    info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
34

D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
    if (NULL == info) {
      *inCache = false;
      ctgWarn("no db cache, dbName:%s", dbName);
      return TSDB_CODE_SUCCESS;
    }

    CTG_LOCK(CTG_READ, &info->lock);
    if (NULL == info->vgInfo) {
      CTG_UNLOCK(CTG_READ, &info->lock);
      taosHashRelease(pCatalog->dbCache.cache, info);
      ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName);
      
      continue;
    }

    break;
D
dapan1121 已提交
51
  }
D
dapan1121 已提交
52

D
dapan1121 已提交
53 54
  *dbInfo = info;
  *inCache = true;
D
dapan1121 已提交
55 56 57 58 59 60 61 62 63 64 65
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
66
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
D
ut test  
dapan1121 已提交
67
  
D
dapan1121 已提交
68
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
69
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
70
      .pCont   = msg,
D
dapan1121 已提交
71 72 73 74 75 76
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
77 78
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for use db, code:%x", rpcRsp.code);
D
dapan1121 已提交
79
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
80
  }
D
dapan1121 已提交
81

D
catalog  
dapan1121 已提交
82
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
83

D
dapan1121 已提交
84 85
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
86 87


H
Haojun Liao 已提交
88
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
89 90 91 92 93
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
94
  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
95
  tNameExtractFullName(pTableName, tbFullName);
D
dapan1121 已提交
96

D
dapan1121 已提交
97 98 99 100
  *pTableMeta = NULL;

  size_t sz = 0;
  STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
D
dapan1121 已提交
101

D
dapan1121 已提交
102
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
103 104 105 106
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
107
  *exist = 1;
D
dapan1121 已提交
108

D
dapan1121 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
  
  STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
  if (NULL == stbMeta || NULL == *stbMeta) {
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid);
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
123

D
dapan1121 已提交
124 125 126 127 128 129
  if ((*stbMeta)->suid != tbMeta->suid) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    tfree(*pTableMeta);
    ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
130

D
dapan1121 已提交
131 132 133 134 135 136
  int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    ctgError("calloc size[%d] failed", metaSize);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
137 138
  }

D
dapan1121 已提交
139 140 141
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

  CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155
  
  return TSDB_CODE_SUCCESS;
}

void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

H
Haojun Liao 已提交
156 157
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
D
dapan1121 已提交
158 159 160 161
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
162
  tNameExtractFullName(pTableName, tbFullName);
D
dapan1121 已提交
163

D
dapan1121 已提交
164
  SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
D
dapan1121 已提交
165 166 167 168
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
169
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
170 171 172 173 174 175

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
176

D
dapan1121 已提交
177 178 179 180 181 182 183 184 185
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
    CTG_ERR_RET(rpcRsp.code);
  }

D
catalog  
dapan1121 已提交
186
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
187 188 189 190 191

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
192 193
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
194
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
195 196
  }

D
dapan1121 已提交
197 198
  char dbFullName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFullName);
D
dapan1121 已提交
199

D
dapan1121 已提交
200
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname};
D
dapan1121 已提交
201 202 203 204
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
205
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
206 207

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
208
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
209 210 211 212 213 214 215 216 217 218
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);

  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
219
  
D
dapan1121 已提交
220
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
221
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
D
dapan1121 已提交
222
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
223 224
  }

D
catalog  
dapan1121 已提交
225
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
226 227 228 229 230

  return TSDB_CODE_SUCCESS;
}


231 232
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
233 234 235 236 237 238 239 240
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
241
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
242
  SHashObj *vgroupHash = NULL;
243
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
244 245
  SArray *vgList = NULL;
  int32_t code = 0;
246

D
dapan1121 已提交
247 248
  vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
D
dapan 已提交
249 250 251 252
    ctgError("taosArrayInit failed");
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

253 254 255
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
256

D
dapan1121 已提交
257
    if (NULL == taosArrayPush(vgList, vgInfo)) {
258
      ctgError("taosArrayPush failed");
D
dapan1121 已提交
259
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
260 261 262 263
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
264 265
  }

D
dapan1121 已提交
266 267 268
  *vgroupList = vgList;
  vgList = NULL;

D
dapan1121 已提交
269
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
270 271 272 273 274 275 276 277

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
278 279
}

H
Haojun Liao 已提交
280
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
281 282
  int32_t code = 0;
  
283
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
H
Haojun Liao 已提交
284 285 286
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

287
  if (vgNum <= 0) {
H
Haojun Liao 已提交
288
    ctgError("db[%s] vgroup cache invalid, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
289
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
290 291
  }

292 293
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
294

D
dapan1121 已提交
295
  CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
296 297

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
298
  tNameExtractFullName(pTableName, tbFullName);
299 300 301 302 303 304 305 306

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
307
    }
308 309 310
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
311 312
  }

313 314
  if (NULL == vgInfo) {
    ctgError("no hash range found for hashvalue[%u]", hashValue);
D
dapan1121 已提交
315
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
316 317 318 319
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
320 321 322
_return:
  
  CTG_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
323 324
}

H
Haojun Liao 已提交
325 326
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
327
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
328
  }
D
dapan1121 已提交
329
  
D
dapan1121 已提交
330 331 332
  int32_t exist = 0;

  if (!forceUpdate) {  
H
Haojun Liao 已提交
333
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
D
dapan1121 已提交
334 335 336 337 338 339

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

H
Haojun Liao 已提交
340
  CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName));
D
dapan1121 已提交
341

H
Haojun Liao 已提交
342
  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
D
dapan1121 已提交
343 344 345

  if (0 == exist) {
    ctgError("get table meta from cache failed, but fetch succeed");
D
dapan1121 已提交
346
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
347 348 349 350 351 352
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
353
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
354 355
  int32_t code = 0;
  
D
dapan1121 已提交
356 357
  if (output->metaNum != 1 && output->metaNum != 2) {
    ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
D
dapan1121 已提交
358
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
359 360 361 362
  }

  if (NULL == output->tbMeta) {
    ctgError("no valid table meta got from meta rsp");
D
dapan1121 已提交
363
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
364 365 366 367 368 369
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
370
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
371
    }
D
dapan1121 已提交
372
  }
D
dapan1121 已提交
373

D
dapan1121 已提交
374
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
375 376 377
    pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.stableCache) {
      ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
378
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
379 380 381 382 383 384
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
      ctgError("push ctable[%s] to table cache failed", output->ctbFname);
D
dapan1121 已提交
385
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
386 387 388 389
    }

    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
      ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
D
dapan1121 已提交
390
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
391 392 393
    }    
  }

D
dapan1121 已提交
394
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
D
dapan1121 已提交
395 396

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
397 398 399 400
    CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
      ctgError("push table[%s] to table cache failed", output->tbFname);
D
dapan1121 已提交
401
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
402 403 404 405 406
    }

    STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
    if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
407
      ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
D
dapan1121 已提交
408
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
409 410 411 412 413
    }
    CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
  } else {
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      ctgError("push table[%s] to table cache failed", output->tbFname);
D
dapan1121 已提交
414
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
415 416
    }
  }
D
dapan1121 已提交
417 418

  CTG_RET(code);
D
dapan1121 已提交
419 420
}

D
dapan1121 已提交
421

D
dapan1121 已提交
422 423
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
  bool inCache = false;
D
dapan1121 已提交
424
  if (0 == forceUpdate) {
D
dapan1121 已提交
425
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
D
dapan1121 已提交
426

D
dapan1121 已提交
427
    if (inCache) {
D
dapan1121 已提交
428 429 430 431 432 433 434 435 436 437
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
438

D
dapan1121 已提交
439 440
  while (true) {
    CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
441

D
dapan1121 已提交
442
    CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
D
dapan1121 已提交
443

D
dapan1121 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));

    if (!inCache) {
      ctgWarn("get db vgroup from cache failed, db:%s", dbName);
      continue;
    }

    break;
  }

  return TSDB_CODE_SUCCESS;
}


int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
  SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (oldInfo) {
    CTG_LOCK(CTG_WRITE, &oldInfo->lock);
    if (dbInfo->vgVersion <= oldInfo->vgVersion) {
      ctgInfo("dbName:%s vg will not update, vgVersion:%d , current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
      CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
      taosHashRelease(pCatalog->dbCache.cache, oldInfo);
      
      return TSDB_CODE_SUCCESS;
    }
    
    if (oldInfo->vgInfo) {
      ctgInfo("dbName:%s vg will be cleanup", dbName);
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
    
    CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
  
    taosHashRelease(pCatalog->dbCache.cache, oldInfo);
  }
D
dapan1121 已提交
480 481 482 483 484

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
485
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
486 487 488
  if (ctgMgmt.pCluster) {
    ctgError("catalog already init");
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
489 490 491 492
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
493

D
dapan1121 已提交
494 495 496 497 498 499 500
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
501 502 503
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan 已提交
504 505
  }

506
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
507 508
  if (NULL == ctgMgmt.pCluster) {
    CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
D
dapan1121 已提交
509 510
  }

D
dapan 已提交
511
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
512 513
}

514 515
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
  if (NULL == catalogHandle) {
D
dapan1121 已提交
516
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
517 518 519 520
  }

  if (NULL == ctgMgmt.pCluster) {
    ctgError("cluster cache are not ready");
D
dapan1121 已提交
521
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
522 523
  }

524
  SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
525

D
catalog  
dapan1121 已提交
526 527
  if (ctg && (*ctg)) {
    *catalogHandle = *ctg;
D
dapan1121 已提交
528
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
529 530
  }

D
catalog  
dapan1121 已提交
531
  SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
D
dapan 已提交
532
  if (NULL == clusterCtg) {
D
catalog  
dapan1121 已提交
533
    ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
D
dapan1121 已提交
534
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
535 536
  }

537 538
  if (taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES)) {
    ctgError("put cluster %"PRIx64" cache to hash failed", clusterId);
D
dapan 已提交
539
    tfree(clusterCtg);
D
dapan1121 已提交
540
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
541
  }
D
dapan1121 已提交
542 543

  *catalogHandle = clusterCtg;
D
dapan 已提交
544
  
D
dapan1121 已提交
545
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
546 547
}

D
dapan1121 已提交
548 549
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
550
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
551 552 553 554 555 556 557
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
558
  SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
559 560 561 562 563
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

564
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
565
  taosHashRelease(pCatalog->dbCache.cache, dbInfo);
D
dapan1121 已提交
566 567 568 569

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SDBVgroupInfo* db = NULL;
  int32_t code = 0;
  SVgroupInfo *vgInfo = NULL;
  SArray *vgList = NULL;
  
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));

  vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
    ctgError("taosArrayInit failed");
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
  }

  void *pIter = taosHashIterate(db->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
      ctgError("taosArrayPush failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(db->vgInfo, pIter);
    vgInfo = NULL;
  }

  *vgroupList = vgList;
  vgList = NULL;

_return:

  if (db) {
    CTG_UNLOCK(CTG_READ, &db->lock);
    taosHashRelease(pCatalog->dbCache.cache, db);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);  
}


D
dapan1121 已提交
620
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
621 622
  int32_t code = 0;
  
D
dapan1121 已提交
623
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
624 625 626 627 628 629
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
    ctgError("invalid db vg, dbName:%s", dbName);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
630 631
  }

632
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
633 634
    ctgWarn("invalid db vgVersion:%d, dbName:%s", dbInfo->vgVersion, dbName);

D
dapan1121 已提交
635
    if (pCatalog->dbCache.cache) {
D
dapan1121 已提交
636 637 638
      CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
      
      CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
D
dapan1121 已提交
639 640 641
    }
    
    ctgWarn("remove db [%s] from cache", dbName);
D
dapan1121 已提交
642
    goto _return;
D
dapan1121 已提交
643
  }
D
dapan1121 已提交
644

D
dapan1121 已提交
645
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
646
    pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
647
    if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
648
      ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
649
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
650
    }
651
  } else {
D
dapan1121 已提交
652
    CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
D
dapan1121 已提交
653 654 655 656
  }

  if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
    ctgError("push to vgroup hash cache failed");
D
dapan1121 已提交
657
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
658 659
  }

D
dapan1121 已提交
660 661 662 663 664 665 666 667 668 669 670 671
  ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);

  dbInfo->vgInfo = NULL;

_return:

  if (dbInfo && dbInfo->vgInfo) {
    taosHashCleanup(dbInfo->vgInfo);
    dbInfo->vgInfo = NULL;
  }
  
  CTG_RET(code);
D
dapan1121 已提交
672 673
}

H
Haojun Liao 已提交
674 675
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
D
dapan1121 已提交
676
}
D
dapan1121 已提交
677

678 679
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName) {
  if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
680
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
681 682
  }

D
dapan1121 已提交
683
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
684
  int32_t code = 0;
H
Haojun Liao 已提交
685

686
  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
D
dapan1121 已提交
687

D
dapan1121 已提交
688 689
  STableMetaOutput output = {0};
  
690
  CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pTransporter, pMgmtEps, pTableName, &vgroupInfo, &output));
D
dapan1121 已提交
691

D
dapan1121 已提交
692
  //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
D
dapan1121 已提交
693

D
dapan1121 已提交
694 695 696
  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));

_return:
D
dapan1121 已提交
697
  tfree(output.tbMeta);
D
dapan1121 已提交
698
  CTG_RET(code);
699
}
700

H
Haojun Liao 已提交
701 702
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta);
D
dapan1121 已提交
703 704
}

H
Haojun Liao 已提交
705 706
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
707
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
708 709 710 711 712
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
713 714 715 716
  SDBVgroupInfo* dbVgroup = NULL;
  SArray *vgList = NULL;

  *pVgroupList = NULL;
D
dapan1121 已提交
717
  
H
Haojun Liao 已提交
718
  CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta));
D
dapan1121 已提交
719

H
Haojun Liao 已提交
720 721
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
H
Haojun Liao 已提交
722
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
D
dapan 已提交
723

724
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
725
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
D
dapan1121 已提交
726
  } else {
727
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
728
    if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
729
      ctgError("vgId[%d] not found in vgroup list", vgId);
D
dapan 已提交
730
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
731
    }
D
dapan1121 已提交
732

D
dapan1121 已提交
733 734
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan 已提交
735 736 737 738
      ctgError("taosArrayInit failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
739
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
D
dapan1121 已提交
740 741 742
      ctgError("push vgroupInfo to array failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
743

D
dapan1121 已提交
744 745 746
    *pVgroupList = vgList;
    vgList = NULL;
  }
D
dapan 已提交
747

D
dapan1121 已提交
748 749
_return:
  tfree(tbMeta);
D
dapan 已提交
750

D
dapan1121 已提交
751 752 753 754 755 756 757 758 759
  if (dbVgroup) {
    CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
    taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
760
  
D
dapan1121 已提交
761
  CTG_RET(code);
D
dapan1121 已提交
762 763 764
}


H
Haojun Liao 已提交
765
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
766
  SDBVgroupInfo* dbInfo = NULL;
D
dapan1121 已提交
767 768
  int32_t code = 0;

H
Haojun Liao 已提交
769 770
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
771

H
Haojun Liao 已提交
772
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
D
dapan1121 已提交
773

774
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup));
D
dapan1121 已提交
775

D
dapan1121 已提交
776 777 778 779 780 781
_return:

  if (dbInfo) {
    CTG_UNLOCK(CTG_READ, &dbInfo->lock);  
    taosHashRelease(pCatalog->dbCache.cache, dbInfo);
  }
D
dapan1121 已提交
782

D
dapan1121 已提交
783
  CTG_RET(code);
D
dapan1121 已提交
784 785 786
}


D
dapan1121 已提交
787 788
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
789
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
790
  }
D
dapan1121 已提交
791 792 793 794 795

  int32_t code = 0;

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
796 797 798 799
    if (tbNum <= 0) {
      ctgError("empty table name list");
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
H
Haojun Liao 已提交
800

D
dapan1121 已提交
801 802 803 804
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
      ctgError("taosArrayInit num[%d] failed", tbNum);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
805 806 807 808 809 810
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
811
      CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, name, &pTableMeta));
D
dapan1121 已提交
812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
833
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
834
  }
D
dapan 已提交
835
  
D
dapan1121 已提交
836
  CTG_RET(code);
837
}
D
dapan 已提交
838

D
dapan1121 已提交
839 840
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
841 842 843 844 845 846 847 848
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }


  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
849 850 851 852 853 854 855 856 857
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
}