catalog.c 25.4 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

D
dapan1121 已提交
23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
D
dapan1121 已提交
24
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
25
    *inCache = false;
D
dapan1121 已提交
26
    ctgWarn("no db cache");
D
dapan1121 已提交
27 28 29
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
30
  SDBVgroupInfo *info = NULL;
D
dapan1121 已提交
31

D
dapan1121 已提交
32 33
  while (true) {
    info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
34

D
dapan1121 已提交
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
    if (NULL == info) {
      *inCache = false;
      ctgWarn("no db cache, dbName:%s", dbName);
      return TSDB_CODE_SUCCESS;
    }

    CTG_LOCK(CTG_READ, &info->lock);
    if (NULL == info->vgInfo) {
      CTG_UNLOCK(CTG_READ, &info->lock);
      taosHashRelease(pCatalog->dbCache.cache, info);
      ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName);
      
      continue;
    }

    break;
D
dapan1121 已提交
51
  }
D
dapan1121 已提交
52

D
dapan1121 已提交
53 54
  *dbInfo = info;
  *inCache = true;
D
dapan1121 已提交
55 56 57 58 59 60 61 62 63 64 65
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
66
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
D
ut test  
dapan1121 已提交
67
  
D
dapan1121 已提交
68
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
69
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
70
      .pCont   = msg,
D
dapan1121 已提交
71 72 73 74 75 76
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
77 78
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for use db, code:%x", rpcRsp.code);
D
dapan1121 已提交
79
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
80
  }
D
dapan1121 已提交
81

D
catalog  
dapan1121 已提交
82
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
83

D
dapan1121 已提交
84 85
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
86 87


H
Haojun Liao 已提交
88
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
89 90 91 92 93
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
94
  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
95
  tNameExtractFullName(pTableName, tbFullName);
D
dapan1121 已提交
96

D
dapan1121 已提交
97 98 99 100
  *pTableMeta = NULL;

  size_t sz = 0;
  STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
D
dapan1121 已提交
101

D
dapan1121 已提交
102
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
103 104 105 106
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
107
  *exist = 1;
D
dapan1121 已提交
108

D
dapan1121 已提交
109 110 111 112 113 114 115 116 117 118 119 120 121 122
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
  
  STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
  if (NULL == stbMeta || NULL == *stbMeta) {
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid);
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
123

D
dapan1121 已提交
124 125 126 127 128 129
  if ((*stbMeta)->suid != tbMeta->suid) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    tfree(*pTableMeta);
    ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
130

D
dapan1121 已提交
131 132 133 134 135 136
  int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    ctgError("calloc size[%d] failed", metaSize);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
137 138
  }

D
dapan1121 已提交
139 140 141
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

  CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
142 143 144 145 146 147 148 149 150 151 152 153 154 155
  
  return TSDB_CODE_SUCCESS;
}

void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

H
Haojun Liao 已提交
156 157
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == output) {
D
dapan1121 已提交
158 159 160 161
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
162
  tNameExtractFullName(pTableName, tbFullName);
D
dapan1121 已提交
163

D
dapan1121 已提交
164
  SBuildTableMetaInput bInput = {.vgId = 0, .dbName = NULL, .tableFullName = tbFullName};
D
dapan1121 已提交
165 166 167 168
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
169
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
170 171 172 173 174 175

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
176

D
dapan1121 已提交
177 178 179 180 181 182 183 184 185
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
    CTG_ERR_RET(rpcRsp.code);
  }

D
catalog  
dapan1121 已提交
186
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
187 188 189 190 191

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
192 193
int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
194
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
195 196
  }

D
dapan1121 已提交
197 198
  char dbFullName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFullName);
D
dapan1121 已提交
199

D
dapan1121 已提交
200
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbName = dbFullName, .tableFullName = pTableName->tname};
D
dapan1121 已提交
201 202 203 204
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
205
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
206 207

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
208
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
209 210 211 212 213 214 215 216 217 218
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);

  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
219
  
D
dapan1121 已提交
220
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
221
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
D
dapan1121 已提交
222
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
223 224
  }

D
catalog  
dapan1121 已提交
225
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
226 227 228 229 230

  return TSDB_CODE_SUCCESS;
}


231 232
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
233 234 235 236 237 238 239 240
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
241
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
242
  SHashObj *vgroupHash = NULL;
243
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
244 245
  SArray *vgList = NULL;
  int32_t code = 0;
246

D
dapan1121 已提交
247 248
  vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
D
dapan 已提交
249 250 251 252
    ctgError("taosArrayInit failed");
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

253 254 255
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
256

D
dapan1121 已提交
257
    if (NULL == taosArrayPush(vgList, vgInfo)) {
258
      ctgError("taosArrayPush failed");
D
dapan1121 已提交
259
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
260 261 262 263
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
264 265
  }

D
dapan1121 已提交
266 267 268
  *vgroupList = vgList;
  vgList = NULL;

D
dapan1121 已提交
269
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
270 271 272 273 274 275 276 277

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
278 279
}

H
Haojun Liao 已提交
280
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
281 282
  int32_t code = 0;
  
283
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
H
Haojun Liao 已提交
284 285 286
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

287
  if (vgNum <= 0) {
H
Haojun Liao 已提交
288
    ctgError("db[%s] vgroup cache invalid, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
289
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
290 291
  }

292 293
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
294

D
dapan1121 已提交
295
  CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
296 297

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
298
  tNameExtractFullName(pTableName, tbFullName);
299 300 301 302 303 304 305 306

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
307
    }
308 309 310
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
311 312
  }

313 314
  if (NULL == vgInfo) {
    ctgError("no hash range found for hashvalue[%u]", hashValue);
D
dapan1121 已提交
315
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
316 317 318 319
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
320 321 322
_return:
  
  CTG_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
323 324
}

H
Haojun Liao 已提交
325 326
int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
327
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
328
  }
D
dapan1121 已提交
329
  
D
dapan1121 已提交
330 331 332
  int32_t exist = 0;

  if (!forceUpdate) {  
H
Haojun Liao 已提交
333
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
D
dapan1121 已提交
334 335 336 337 338 339

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

H
Haojun Liao 已提交
340
  CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pTableName));
D
dapan1121 已提交
341

H
Haojun Liao 已提交
342
  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pTableName, pTableMeta, &exist));
D
dapan1121 已提交
343 344 345

  if (0 == exist) {
    ctgError("get table meta from cache failed, but fetch succeed");
D
dapan1121 已提交
346
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
347 348 349 350 351 352
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
353
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
354 355
  int32_t code = 0;
  
D
dapan1121 已提交
356 357
  if (output->metaNum != 1 && output->metaNum != 2) {
    ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
D
dapan1121 已提交
358
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
359 360 361 362
  }

  if (NULL == output->tbMeta) {
    ctgError("no valid table meta got from meta rsp");
D
dapan1121 已提交
363
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
364 365 366 367 368 369
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
370
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
371
    }
D
dapan1121 已提交
372
  }
D
dapan1121 已提交
373

D
dapan1121 已提交
374
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
375 376 377
    pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.stableCache) {
      ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
378
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
379 380 381 382 383 384
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
      ctgError("push ctable[%s] to table cache failed", output->ctbFname);
D
dapan1121 已提交
385
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
386 387 388 389
    }

    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
      ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
D
dapan1121 已提交
390
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
391 392 393
    }    
  }

D
dapan1121 已提交
394
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
D
dapan1121 已提交
395 396

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
397 398 399 400
    CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
      ctgError("push table[%s] to table cache failed", output->tbFname);
D
dapan1121 已提交
401
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
402 403 404 405 406
    }

    STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
    if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
407
      ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
D
dapan1121 已提交
408
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
409 410 411 412 413
    }
    CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
  } else {
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      ctgError("push table[%s] to table cache failed", output->tbFname);
D
dapan1121 已提交
414
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
415 416
    }
  }
D
dapan1121 已提交
417 418

  CTG_RET(code);
D
dapan1121 已提交
419 420
}

D
dapan1121 已提交
421

D
dapan1121 已提交
422 423
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
  bool inCache = false;
D
dapan1121 已提交
424
  if (0 == forceUpdate) {
D
dapan1121 已提交
425
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
D
dapan1121 已提交
426

D
dapan1121 已提交
427
    if (inCache) {
D
dapan1121 已提交
428 429 430 431 432 433 434 435 436 437
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
438

D
dapan1121 已提交
439 440
  while (true) {
    CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
441

D
dapan1121 已提交
442
    CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));
D
dapan1121 已提交
443

D
dapan1121 已提交
444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));

    if (!inCache) {
      ctgWarn("get db vgroup from cache failed, db:%s", dbName);
      continue;
    }

    break;
  }

  return TSDB_CODE_SUCCESS;
}


int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
  SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (oldInfo) {
    CTG_LOCK(CTG_WRITE, &oldInfo->lock);
    if (dbInfo->vgVersion <= oldInfo->vgVersion) {
      ctgInfo("dbName:%s vg will not update, vgVersion:%d , current:%d", dbName, dbInfo->vgVersion, oldInfo->vgVersion);
      CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
      taosHashRelease(pCatalog->dbCache.cache, oldInfo);
      
      return TSDB_CODE_SUCCESS;
    }
    
    if (oldInfo->vgInfo) {
      ctgInfo("dbName:%s vg will be cleanup", dbName);
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
    
    CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
  
    taosHashRelease(pCatalog->dbCache.cache, oldInfo);
  }
D
dapan1121 已提交
480 481 482 483 484

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
485
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
486 487 488
  if (ctgMgmt.pCluster) {
    ctgError("catalog already init");
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
489 490 491 492
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
493

D
dapan1121 已提交
494 495 496 497 498 499 500
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
501 502 503
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan 已提交
504 505
  }

506
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
507 508
  if (NULL == ctgMgmt.pCluster) {
    CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
D
dapan1121 已提交
509 510
  }

D
dapan 已提交
511
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
512 513
}

514 515
int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
  if (NULL == catalogHandle) {
D
dapan1121 已提交
516
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
517 518 519 520
  }

  if (NULL == ctgMgmt.pCluster) {
    ctgError("cluster cache are not ready");
D
dapan1121 已提交
521
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
522 523
  }

524
  SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
525

D
catalog  
dapan1121 已提交
526 527
  if (ctg && (*ctg)) {
    *catalogHandle = *ctg;
D
dapan1121 已提交
528
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
529 530
  }

D
catalog  
dapan1121 已提交
531
  SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
D
dapan 已提交
532
  if (NULL == clusterCtg) {
D
catalog  
dapan1121 已提交
533
    ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
D
dapan1121 已提交
534
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
535 536
  }

537 538
  if (taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES)) {
    ctgError("put cluster %"PRIx64" cache to hash failed", clusterId);
D
dapan 已提交
539
    tfree(clusterCtg);
D
dapan1121 已提交
540
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
541
  }
D
dapan1121 已提交
542 543

  *catalogHandle = clusterCtg;
D
dapan 已提交
544
  
D
dapan1121 已提交
545
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
546 547
}

D
dapan1121 已提交
548 549
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
550
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
551 552 553 554 555 556 557
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
558
  SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
559 560 561 562 563
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

564
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
565
  taosHashRelease(pCatalog->dbCache.cache, dbInfo);
D
dapan1121 已提交
566 567 568 569

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SDBVgroupInfo* db = NULL;
  int32_t code = 0;
  SVgroupInfo *vgInfo = NULL;
  SArray *vgList = NULL;
  
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));

  vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
    ctgError("taosArrayInit failed");
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
  }

  void *pIter = taosHashIterate(db->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
      ctgError("taosArrayPush failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(db->vgInfo, pIter);
    vgInfo = NULL;
  }

  *vgroupList = vgList;
  vgList = NULL;

_return:

  if (db) {
    CTG_UNLOCK(CTG_READ, &db->lock);
    taosHashRelease(pCatalog->dbCache.cache, db);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);  
}


D
dapan1121 已提交
620
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
621 622
  int32_t code = 0;
  
D
dapan1121 已提交
623
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
624 625 626 627 628 629
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == dbInfo->vgInfo || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgInfo) <= 0) {
    ctgError("invalid db vg, dbName:%s", dbName);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
630 631
  }

632
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
633 634
    ctgWarn("invalid db vgVersion:%d, dbName:%s", dbInfo->vgVersion, dbName);

D
dapan1121 已提交
635
    if (pCatalog->dbCache.cache) {
D
dapan1121 已提交
636 637 638
      CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
      
      CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
D
dapan1121 已提交
639 640 641
    }
    
    ctgWarn("remove db [%s] from cache", dbName);
D
dapan1121 已提交
642
    goto _return;
D
dapan1121 已提交
643
  }
D
dapan1121 已提交
644

D
dapan1121 已提交
645
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
646
    pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
647
    if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
648
      ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
649
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
650
    }
651
  } else {
D
dapan1121 已提交
652
    CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
D
dapan1121 已提交
653 654 655 656
  }

  if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
    ctgError("push to vgroup hash cache failed");
D
dapan1121 已提交
657
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
658 659
  }

D
dapan1121 已提交
660 661 662 663 664 665 666 667 668 669 670 671
  ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);

  dbInfo->vgInfo = NULL;

_return:

  if (dbInfo && dbInfo->vgInfo) {
    taosHashCleanup(dbInfo->vgInfo);
    dbInfo->vgInfo = NULL;
  }
  
  CTG_RET(code);
D
dapan1121 已提交
672 673
}

H
Haojun Liao 已提交
674 675
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta);
D
dapan1121 已提交
676
}
D
dapan1121 已提交
677

H
Haojun Liao 已提交
678 679
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
680
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
681 682
  }

D
dapan1121 已提交
683
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
684
  int32_t code = 0;
H
Haojun Liao 已提交
685 686

  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo));
D
dapan1121 已提交
687

D
dapan1121 已提交
688 689
  STableMetaOutput output = {0};
  
D
dapan1121 已提交
690
  CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pTableName, &vgroupInfo, &output));
D
dapan1121 已提交
691

D
dapan1121 已提交
692
  //CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pTableName, &output));
D
dapan1121 已提交
693

D
dapan1121 已提交
694 695 696
  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));

_return:
D
dapan1121 已提交
697

D
dapan1121 已提交
698 699
  tfree(output.tbMeta);
  
D
dapan1121 已提交
700
  CTG_RET(code);
701
}
702

H
Haojun Liao 已提交
703 704
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pTableName, true, pTableMeta);
D
dapan1121 已提交
705 706
}

H
Haojun Liao 已提交
707 708
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
709
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
710 711 712 713 714
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
715 716 717 718
  SDBVgroupInfo* dbVgroup = NULL;
  SArray *vgList = NULL;

  *pVgroupList = NULL;
D
dapan1121 已提交
719
  
H
Haojun Liao 已提交
720
  CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pTableName, &tbMeta));
D
dapan1121 已提交
721

H
Haojun Liao 已提交
722 723
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
H
Haojun Liao 已提交
724
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
D
dapan 已提交
725

726
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
727
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
D
dapan1121 已提交
728
  } else {
729
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
730
    if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
731
      ctgError("vgId[%d] not found in vgroup list", vgId);
D
dapan 已提交
732
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
733
    }
D
dapan1121 已提交
734

D
dapan1121 已提交
735 736
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan 已提交
737 738 739 740
      ctgError("taosArrayInit failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
741
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
D
dapan1121 已提交
742 743 744
      ctgError("push vgroupInfo to array failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
745

D
dapan1121 已提交
746 747 748
    *pVgroupList = vgList;
    vgList = NULL;
  }
D
dapan 已提交
749

D
dapan1121 已提交
750 751
_return:
  tfree(tbMeta);
D
dapan 已提交
752

D
dapan1121 已提交
753 754 755 756 757 758 759 760 761
  if (dbVgroup) {
    CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
    taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
762
  
D
dapan1121 已提交
763
  CTG_RET(code);
D
dapan1121 已提交
764 765 766
}


H
Haojun Liao 已提交
767
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
768
  SDBVgroupInfo* dbInfo = NULL;
D
dapan1121 已提交
769 770
  int32_t code = 0;

H
Haojun Liao 已提交
771 772
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
773

H
Haojun Liao 已提交
774
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbInfo));
D
dapan1121 已提交
775

776
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pTableName, pVgroup));
D
dapan1121 已提交
777

D
dapan1121 已提交
778 779 780 781 782 783
_return:

  if (dbInfo) {
    CTG_UNLOCK(CTG_READ, &dbInfo->lock);  
    taosHashRelease(pCatalog->dbCache.cache, dbInfo);
  }
D
dapan1121 已提交
784

D
dapan1121 已提交
785
  CTG_RET(code);
D
dapan1121 已提交
786 787 788
}


D
dapan1121 已提交
789 790
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
791
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
792
  }
D
dapan1121 已提交
793 794 795 796 797

  int32_t code = 0;

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
798 799 800 801
    if (tbNum <= 0) {
      ctgError("empty table name list");
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
H
Haojun Liao 已提交
802

D
dapan1121 已提交
803 804 805 806
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
      ctgError("taosArrayInit num[%d] failed", tbNum);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
807 808 809 810 811 812
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
813
      CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, name, &pTableMeta));
D
dapan1121 已提交
814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
835
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
836
  }
D
dapan 已提交
837
  
D
dapan1121 已提交
838
  CTG_RET(code);
839
}
D
dapan 已提交
840

D
dapan1121 已提交
841 842
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
843 844 845 846 847 848 849 850
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }


  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
851 852 853 854 855 856 857 858 859
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
}