catalog.c 20.3 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15 16
 */

#include "catalogInt.h"
D
dapan1121 已提交
17
#include "trpc.h"
D
dapan1121 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "tname.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) {
D
dapan1121 已提交
24 25 26 27 28
  if (NULL == pCatalog->dbCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
29 30
  SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));

31
  if (NULL == info) {
D
dapan1121 已提交
32 33 34
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
35 36

  if (dbInfo) {
37
    *dbInfo = *info;
D
dapan1121 已提交
38
  }
D
dapan1121 已提交
39

D
dapan1121 已提交
40
  *exist = 1;
D
dapan1121 已提交
41 42 43 44 45 46 47 48 49 50 51
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
52
  CTG_ERR_RET(queryBuildMsg[TSDB_MSG_TYPE_USE_DB](input, &msg, 0, &msgLen));
D
dapan1121 已提交
53

D
ut test  
dapan1121 已提交
54 55 56 57
  char *pMsg = rpcMallocCont(msgLen);
  if (NULL == pMsg) {
    ctgError("rpc malloc %d failed", msgLen);
    tfree(msg);
D
dapan1121 已提交
58
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
ut test  
dapan1121 已提交
59 60 61 62 63 64
  }

  memcpy(pMsg, msg, msgLen);

  tfree(msg);
  
D
dapan1121 已提交
65 66
  SRpcMsg rpcMsg = {
      .msgType = TSDB_MSG_TYPE_USE_DB,
D
ut test  
dapan1121 已提交
67
      .pCont   = pMsg,
D
dapan1121 已提交
68 69 70 71 72 73
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
74 75
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for use db, code:%x", rpcRsp.code);
D
dapan1121 已提交
76
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
77
  }
D
dapan1121 已提交
78

D
dapan1121 已提交
79
  CTG_ERR_RET(queryProcessMsgRsp[TSDB_MSG_TYPE_USE_DB](out, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
80

D
dapan1121 已提交
81 82
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
83 84


D
dapan1121 已提交
85
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
86 87 88 89 90
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
91
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);

  STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));

  if (NULL == tbMeta) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  if (tbMeta->tableType == TSDB_CHILD_TABLE) {
    STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
    if (NULL == stbMeta || NULL == *stbMeta) {
      *exist = 0;
      return TSDB_CODE_SUCCESS;
    }

    if ((*stbMeta)->suid != tbMeta->suid) {
      ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
111
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
112 113 114 115 116 117
    }

    int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
    *pTableMeta = calloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgError("calloc size[%d] failed", metaSize);
D
dapan1121 已提交
118
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
119 120 121 122 123 124 125 126 127
    }

    memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
    memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
  } else {
    int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema);
    *pTableMeta = calloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgError("calloc size[%d] failed", metaSize);
D
dapan1121 已提交
128
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
  }

  *exist = 1;
  
  return TSDB_CODE_SUCCESS;
}

void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}


int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
152
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
153 154
  }

D
dapan1121 已提交
155
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
156 157 158 159 160 161 162 163

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
164
  CTG_ERR_RET(queryBuildMsg[TSDB_MSG_TYPE_TABLE_META](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
165 166 167 168 169 170 171 172 173 174 175 176 177

  SRpcMsg rpcMsg = {
      .msgType = TSDB_MSG_TYPE_TABLE_META,
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);

  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
178
  
D
dapan1121 已提交
179
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
180
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
D
dapan1121 已提交
181
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
182 183
  }

D
dapan1121 已提交
184
  CTG_ERR_RET(queryProcessMsgRsp[TSDB_MSG_TYPE_TABLE_META](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
185 186 187 188 189

  return TSDB_CODE_SUCCESS;
}


190 191
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
192 193 194 195 196 197 198 199
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

200
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) {
D
dapan1121 已提交
201
  SHashObj *vgroupHash = NULL;
202 203 204 205 206
  SVgroupInfo *vgInfo = NULL;

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
207

208 209
    if (NULL == taosArrayPush(vgroupList, vgInfo)) {
      ctgError("taosArrayPush failed");
D
dapan1121 已提交
210
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
211 212 213 214
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
215 216 217 218 219
  }

  return TSDB_CODE_SUCCESS;
}

220 221 222 223
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
  if (vgNum <= 0) {
    ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
D
dapan1121 已提交
224
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
225 226
  }

227 228
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
229

230 231 232 233 234 235 236 237 238 239 240 241 242
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
243
    }
244 245 246
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
247 248
  }

249 250
  if (NULL == vgInfo) {
    ctgError("no hash range found for hashvalue[%u]", hashValue);
D
dapan1121 已提交
251
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
252 253 254 255
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
256 257 258 259
  return TSDB_CODE_SUCCESS;
}

int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
D
dapan1121 已提交
260
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
261
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279
  }

  int32_t exist = 0;

  if (!forceUpdate) {  
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

  CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

  if (0 == exist) {
    ctgError("get table meta from cache failed, but fetch succeed");
D
dapan1121 已提交
280
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
281 282 283 284 285 286
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
287
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
288 289
  if (output->metaNum != 1 && output->metaNum != 2) {
    ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
D
dapan1121 已提交
290
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
291 292 293 294
  }

  if (NULL == output->tbMeta) {
    ctgError("no valid table meta got from meta rsp");
D
dapan1121 已提交
295
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
296 297 298 299 300 301
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
302
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
303 304 305 306 307 308 309
    }
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
310
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
311 312 313 314 315
    }

    pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.stableCache) {
      ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
316
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
      ctgError("push ctable[%s] to table cache failed", output->ctbFname);
      goto error_exit;
    }

    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
      ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
      goto error_exit;
    }    
  }

  if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, sizeof(*output->tbMeta)) != 0) {
    ctgError("push table[%s] to table cache failed", output->tbFname);
    goto error_exit;
  }

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
    if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
      ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
      goto error_exit;
    }
  }
  
  return TSDB_CODE_SUCCESS;

error_exit:
  if (pCatalog->vgroupCache.cache) {
    taosHashCleanup(pCatalog->vgroupCache.cache);
    pCatalog->vgroupCache.cache = NULL;
  }

  pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;

D
dapan1121 已提交
354
  CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
355 356
}

D
dapan1121 已提交
357
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
358
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan 已提交
359
  if (NULL == ctgMgmt.pCluster) {
D
dapan1121 已提交
360 361 362 363 364 365 366 367 368
    CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
  } else {
    ctgMgmt.cfg.enableVgroupCache = true;
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan 已提交
369 370 371
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
372 373
}

374
int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle) {
D
dapan1121 已提交
375
  if (NULL == clusterId || NULL == catalogHandle) {
D
dapan1121 已提交
376
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
377 378 379 380
  }

  if (NULL == ctgMgmt.pCluster) {
    ctgError("cluster cache are not ready");
D
dapan1121 已提交
381
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
382 383 384 385 386 387
  }

  size_t clen = strlen(clusterId);
  SCatalog *clusterCtg = (SCatalog *)taosHashGet(ctgMgmt.pCluster, clusterId, clen);

  if (clusterCtg) {
D
dapan1121 已提交
388 389
    *catalogHandle = clusterCtg;
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
390 391 392 393
  }

  clusterCtg = calloc(1, sizeof(*clusterCtg));
  if (NULL == clusterCtg) {
D
dapan1121 已提交
394
    ctgError("calloc %d failed", (int32_t)sizeof(*clusterCtg));
D
dapan1121 已提交
395
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
396 397
  }

D
dapan1121 已提交
398 399
  clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;

D
dapan 已提交
400 401 402
  if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
    ctgError("put cluster %s cache to hash failed", clusterId);
    tfree(clusterCtg);
D
dapan1121 已提交
403
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
404
  }
D
dapan1121 已提交
405 406

  *catalogHandle = clusterCtg;
D
dapan 已提交
407
  
D
dapan1121 已提交
408
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
409 410
}

D
dapan1121 已提交
411 412
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
413
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
414 415 416 417 418 419 420 421 422 423 424 425 426
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

  SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

427
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
428 429 430 431

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
432
int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
433
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
434
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
435 436
  }

437
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
438 439 440 441 442 443 444
    if (pCatalog->dbCache.cache) {
      taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
    }
    
    ctgWarn("remove db [%s] from cache", dbName);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
445

D
dapan1121 已提交
446
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
447
    pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
448
    if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
449
      ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
450
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
451
    }
452 453 454 455 456 457
  } else {
    SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
    if (oldInfo && oldInfo->vgInfo) {
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
D
dapan1121 已提交
458 459 460 461
  }

  if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
    ctgError("push to vgroup hash cache failed");
D
dapan1121 已提交
462
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
463 464 465
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
466 467 468
}


D
dapan1121 已提交
469 470


471
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
472
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
D
dapan1121 已提交
473
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
474 475 476 477 478 479 480 481 482 483 484 485
  }

  int32_t exist = 0;

  if (0 == forceUpdate) {
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

D
dapan1121 已提交
486 487 488 489 490
  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
491
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
492
  
D
dapan1121 已提交
493
  CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
494

495
  CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup));
D
dapan1121 已提交
496 497

  if (dbInfo) {
D
dapan1121 已提交
498
    *dbInfo = DbOut.dbVgroup;
D
dapan1121 已提交
499 500
  }

D
dapan1121 已提交
501
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
502 503
}

D
dapan1121 已提交
504
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
505 506
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, false, pTableMeta);
}
D
dapan1121 已提交
507

D
dapan1121 已提交
508 509
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
510
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
511 512
  }

D
dapan1121 已提交
513 514
  SVgroupInfo vgroupInfo = {0};
  
D
dapan1121 已提交
515
  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
D
dapan1121 已提交
516

D
dapan1121 已提交
517 518 519
  STableMetaOutput output = {0};
  
  CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));
D
dapan1121 已提交
520

D
dapan1121 已提交
521
  CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output));
D
dapan1121 已提交
522

D
dapan1121 已提交
523 524
  tfree(output.tbMeta);
  
D
dapan1121 已提交
525
  return TSDB_CODE_SUCCESS;
526
}
527

D
dapan1121 已提交
528
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
529 530 531
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}

D
dapan1121 已提交
532
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) {
D
dapan1121 已提交
533
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
534
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
535 536 537 538 539
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
540
  SDBVgroupInfo dbVgroup = {0};
D
dapan1121 已提交
541 542 543
  
  CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));

544
  CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
D
dapan 已提交
545

546 547
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
D
dapan1121 已提交
548
  } else {
549 550 551
    int32_t vgId = tbMeta->vgId;
    if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
      ctgError("vgId[%d] not found in vgroup list", vgId);
D
dapan1121 已提交
552
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);    
553
    }
D
dapan1121 已提交
554 555 556 557 558 559 560 561 562 563

    if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) {
      ctgError("push vgroupInfo to array failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
  }

_return:
  tfree(tbMeta);
  
D
dapan1121 已提交
564
  CTG_RET(code);
D
dapan1121 已提交
565 566 567
}


568
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
569 570 571 572
  SDBVgroupInfo dbInfo = {0};
  int32_t code = 0;
  int32_t vgId = 0;

573
  CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
D
dapan1121 已提交
574 575 576

  if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
    ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
D
dapan1121 已提交
577
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
578 579 580 581
  }

  CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));

D
dapan1121 已提交
582
  CTG_RET(code);
D
dapan1121 已提交
583 584 585
}


D
dapan1121 已提交
586 587
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
588
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
589
  }
D
dapan1121 已提交
590 591 592 593 594 595 596 597 598 599

  int32_t code = 0;

  if (pReq->pTableName) {
    char dbName[TSDB_FULL_DB_NAME_LEN];
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
    if (tbNum > 0) {
      pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
      if (NULL == pRsp->pTableMeta) {
        ctgError("taosArrayInit num[%d] failed", tbNum);
D
dapan1121 已提交
600
        CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
601 602 603 604 605 606 607
      }
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
608
      snprintf(dbName, sizeof(dbName), "%d.%s", name->acctId, name->dbname);
D
dapan1121 已提交
609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632

      CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
  }
D
dapan 已提交
633
  
D
dapan1121 已提交
634
  CTG_RET(code);
635
}
D
dapan 已提交
636

D
dapan 已提交
637 638 639 640 641 642 643 644 645 646
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeEpSet) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }


  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
647 648 649 650 651 652 653 654 655
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
}