catalog.c 21.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15 16
 */

#include "catalogInt.h"
D
dapan1121 已提交
17
#include "trpc.h"
D
dapan1121 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "tname.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) {
D
dapan1121 已提交
24 25 26 27 28
  if (NULL == pCatalog->dbCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
29 30
  SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));

31
  if (NULL == info) {
D
dapan1121 已提交
32 33 34
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
35 36

  if (dbInfo) {
37
    *dbInfo = *info;
D
dapan1121 已提交
38
  }
D
dapan1121 已提交
39

D
dapan1121 已提交
40
  *exist = 1;
D
dapan1121 已提交
41 42 43 44 45 46 47 48 49 50 51
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
52
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
D
ut test  
dapan1121 已提交
53
  
D
dapan1121 已提交
54
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
55
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
56
      .pCont   = msg,
D
dapan1121 已提交
57 58 59 60 61 62
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
63 64
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for use db, code:%x", rpcRsp.code);
D
dapan1121 已提交
65
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
66
  }
D
dapan1121 已提交
67

D
catalog  
dapan1121 已提交
68
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
69

D
dapan1121 已提交
70 71
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
72 73


D
dapan1121 已提交
74
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
75 76 77 78 79
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
80
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);

  STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));

  if (NULL == tbMeta) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  if (tbMeta->tableType == TSDB_CHILD_TABLE) {
    STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
    if (NULL == stbMeta || NULL == *stbMeta) {
      *exist = 0;
      return TSDB_CODE_SUCCESS;
    }

    if ((*stbMeta)->suid != tbMeta->suid) {
      ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
100
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
101 102 103 104 105 106
    }

    int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
    *pTableMeta = calloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgError("calloc size[%d] failed", metaSize);
D
dapan1121 已提交
107
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
108 109 110 111 112 113 114 115 116
    }

    memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
    memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
  } else {
    int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema);
    *pTableMeta = calloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgError("calloc size[%d] failed", metaSize);
D
dapan1121 已提交
117
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
  }

  *exist = 1;
  
  return TSDB_CODE_SUCCESS;
}

void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

D
dapan1121 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == output) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
152
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
153 154 155 156 157 158

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
159

D
dapan1121 已提交
160 161 162 163 164 165 166 167 168
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
    CTG_ERR_RET(rpcRsp.code);
  }

D
catalog  
dapan1121 已提交
169
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
170 171 172 173 174 175

  return TSDB_CODE_SUCCESS;
}


int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
D
dapan1121 已提交
176
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
177
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
178 179
  }

D
dapan1121 已提交
180
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
181 182 183 184 185 186 187 188

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
189
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
190 191

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
192
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
193 194 195 196 197 198 199 200 201 202
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);

  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
203
  
D
dapan1121 已提交
204
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
205
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
D
dapan1121 已提交
206
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
207 208
  }

D
catalog  
dapan1121 已提交
209
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
210 211 212 213 214

  return TSDB_CODE_SUCCESS;
}


215 216
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
217 218 219 220 221 222 223 224
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
225
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
226
  SHashObj *vgroupHash = NULL;
227 228
  SVgroupInfo *vgInfo = NULL;

D
dapan 已提交
229 230 231 232 233 234
  *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
  if (NULL == *vgroupList) {
    ctgError("taosArrayInit failed");
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

235 236 237
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
238

D
dapan 已提交
239
    if (NULL == taosArrayPush(*vgroupList, vgInfo)) {
240
      ctgError("taosArrayPush failed");
D
dapan1121 已提交
241
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
242 243 244 245
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
246 247 248 249 250
  }

  return TSDB_CODE_SUCCESS;
}

251 252 253 254
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
  if (vgNum <= 0) {
    ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
D
dapan1121 已提交
255
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
256 257
  }

258 259
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
260

261 262 263 264 265 266 267 268 269 270 271 272 273
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
274
    }
275 276 277
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
278 279
  }

280 281
  if (NULL == vgInfo) {
    ctgError("no hash range found for hashvalue[%u]", hashValue);
D
dapan1121 已提交
282
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
283 284 285 286
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
287 288 289 290
  return TSDB_CODE_SUCCESS;
}

int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
D
dapan1121 已提交
291
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
292
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
  }

  int32_t exist = 0;

  if (!forceUpdate) {  
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

  CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

  if (0 == exist) {
    ctgError("get table meta from cache failed, but fetch succeed");
D
dapan1121 已提交
311
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
312 313 314 315 316 317
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
318
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
319 320
  if (output->metaNum != 1 && output->metaNum != 2) {
    ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
D
dapan1121 已提交
321
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
322 323 324 325
  }

  if (NULL == output->tbMeta) {
    ctgError("no valid table meta got from meta rsp");
D
dapan1121 已提交
326
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
327 328 329 330 331 332
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
333
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
334
    }
D
dapan1121 已提交
335
  }
D
dapan1121 已提交
336

D
dapan1121 已提交
337
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
338 339 340
    pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.stableCache) {
      ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
341
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
342 343 344 345 346 347
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
      ctgError("push ctable[%s] to table cache failed", output->ctbFname);
D
dapan1121 已提交
348
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
349 350 351 352
    }

    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
      ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
D
dapan1121 已提交
353
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
354 355 356
    }    
  }

D
dapan1121 已提交
357 358
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
  if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
D
dapan1121 已提交
359
    ctgError("push table[%s] to table cache failed", output->tbFname);
D
dapan1121 已提交
360
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
361 362 363 364 365
  }

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
    if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
      ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
D
dapan1121 已提交
366
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
367 368 369 370 371 372
    }
  }
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
373
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
374 375 376
  if (ctgMgmt.pCluster) {
    ctgError("catalog already init");
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
377 378 379 380 381 382 383
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan 已提交
384 385
  }

D
catalog  
dapan1121 已提交
386
  if (CTG_CACHE_ENABLED()) {
D
dapan1121 已提交
387 388 389 390 391 392
    ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == ctgMgmt.pCluster) {
      CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    }
  }

D
dapan 已提交
393
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
394 395
}

396
int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle) {
D
dapan1121 已提交
397
  if (NULL == clusterId || NULL == catalogHandle) {
D
dapan1121 已提交
398
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
399 400 401 402
  }

  if (NULL == ctgMgmt.pCluster) {
    ctgError("cluster cache are not ready");
D
dapan1121 已提交
403
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
404 405 406
  }

  size_t clen = strlen(clusterId);
D
catalog  
dapan1121 已提交
407
  SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, clusterId, clen);
D
dapan 已提交
408

D
catalog  
dapan1121 已提交
409 410
  if (ctg && (*ctg)) {
    *catalogHandle = *ctg;
D
dapan1121 已提交
411
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
412 413
  }

D
catalog  
dapan1121 已提交
414
  SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
D
dapan 已提交
415
  if (NULL == clusterCtg) {
D
catalog  
dapan1121 已提交
416
    ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
D
dapan1121 已提交
417
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
418 419 420 421 422
  }

  if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
    ctgError("put cluster %s cache to hash failed", clusterId);
    tfree(clusterCtg);
D
dapan1121 已提交
423
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
424
  }
D
dapan1121 已提交
425 426

  *catalogHandle = clusterCtg;
D
dapan 已提交
427
  
D
dapan1121 已提交
428
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
429 430
}

D
dapan1121 已提交
431 432
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
433
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
434 435 436 437 438 439 440 441 442 443 444 445 446
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

  SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

447
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
448 449 450 451

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
452
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
453
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
454
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
455 456
  }

457
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
458 459 460 461 462 463 464
    if (pCatalog->dbCache.cache) {
      taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
    }
    
    ctgWarn("remove db [%s] from cache", dbName);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
465

D
dapan1121 已提交
466
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
467
    pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
468
    if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
469
      ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
470
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
471
    }
472 473 474 475 476 477
  } else {
    SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
    if (oldInfo && oldInfo->vgInfo) {
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
D
dapan1121 已提交
478 479 480 481
  }

  if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
    ctgError("push to vgroup hash cache failed");
D
dapan1121 已提交
482
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
483 484 485
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
486 487 488
}


D
dapan1121 已提交
489 490


491
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
492
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
D
dapan1121 已提交
493
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
494 495 496 497 498 499 500 501 502 503 504 505
  }

  int32_t exist = 0;

  if (0 == forceUpdate) {
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
506 507 508 509 510
  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
511
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
512
  
D
dapan1121 已提交
513
  CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
514

515
  CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup));
D
dapan1121 已提交
516 517

  if (dbInfo) {
D
dapan1121 已提交
518
    *dbInfo = DbOut.dbVgroup;
D
dapan1121 已提交
519 520
  }

D
dapan1121 已提交
521
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
522 523
}

524 525
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
D
dapan1121 已提交
526
}
D
dapan1121 已提交
527

D
dapan1121 已提交
528 529
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
530
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
531 532
  }

D
dapan1121 已提交
533 534
  SVgroupInfo vgroupInfo = {0};
  
D
dapan1121 已提交
535
  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
D
dapan1121 已提交
536

D
dapan1121 已提交
537 538
  STableMetaOutput output = {0};
  
D
dapan1121 已提交
539 540 541
  //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));

  CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
D
dapan1121 已提交
542

D
dapan1121 已提交
543
  CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
D
dapan1121 已提交
544

D
dapan1121 已提交
545 546
  tfree(output.tbMeta);
  
D
dapan1121 已提交
547
  return TSDB_CODE_SUCCESS;
548
}
549

D
dapan1121 已提交
550
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
551 552 553
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}

D
dapan 已提交
554
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) {
D
dapan1121 已提交
555
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
556
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
557 558 559 560 561
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
562
  SDBVgroupInfo dbVgroup = {0};
D
dapan1121 已提交
563 564 565
  
  CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));

566
  CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
D
dapan 已提交
567

568 569
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
D
dapan1121 已提交
570
  } else {
571 572 573
    int32_t vgId = tbMeta->vgId;
    if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
      ctgError("vgId[%d] not found in vgroup list", vgId);
D
dapan 已提交
574
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
575
    }
D
dapan1121 已提交
576

D
dapan 已提交
577 578 579 580 581 582 583
    *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == *pVgroupList) {
      ctgError("taosArrayInit failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

    if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) {
D
dapan1121 已提交
584 585 586 587 588
      ctgError("push vgroupInfo to array failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
  }

D
dapan 已提交
589 590 591 592
  tfree(tbMeta);

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
593 594
_return:
  tfree(tbMeta);
D
dapan 已提交
595 596

  taosArrayDestroy(*pVgroupList);
D
dapan1121 已提交
597
  
D
dapan1121 已提交
598
  CTG_RET(code);
D
dapan1121 已提交
599 600 601
}


602
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
603 604 605 606
  SDBVgroupInfo dbInfo = {0};
  int32_t code = 0;
  int32_t vgId = 0;

607
  CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
D
dapan1121 已提交
608 609 610

  if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
    ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
D
dapan1121 已提交
611
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
612 613 614 615
  }

  CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));

D
dapan1121 已提交
616
  CTG_RET(code);
D
dapan1121 已提交
617 618 619
}


D
dapan1121 已提交
620 621
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
622
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
623
  }
D
dapan1121 已提交
624 625 626 627

  int32_t code = 0;

  if (pReq->pTableName) {
628
    char dbName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
629 630 631 632 633
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
    if (tbNum > 0) {
      pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
      if (NULL == pRsp->pTableMeta) {
        ctgError("taosArrayInit num[%d] failed", tbNum);
D
dapan1121 已提交
634
        CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
635 636 637 638 639 640 641
      }
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
642
      snprintf(dbName, sizeof(dbName), "%d.%s", name->acctId, name->dbname);
D
dapan1121 已提交
643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666

      CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
  }
D
dapan 已提交
667
  
D
dapan1121 已提交
668
  CTG_RET(code);
669
}
D
dapan 已提交
670

D
dapan1121 已提交
671 672
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
673 674 675 676 677 678 679 680
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }


  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
681 682 683 684 685 686 687 688 689
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
}