catalog.c 22.0 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15 16
 */

#include "catalogInt.h"
D
dapan1121 已提交
17
#include "trpc.h"
D
dapan1121 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "tname.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo *dbInfo, int32_t *exist) {
D
dapan1121 已提交
24 25 26 27 28
  if (NULL == pCatalog->dbCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
29 30
  SDBVgroupInfo *info = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));

31
  if (NULL == info) {
D
dapan1121 已提交
32 33 34
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
35 36

  if (dbInfo) {
37
    *dbInfo = *info;
D
dapan1121 已提交
38
  }
D
dapan1121 已提交
39

D
dapan1121 已提交
40
  *exist = 1;
D
dapan1121 已提交
41 42 43 44 45 46 47 48 49 50 51
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
52
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
D
ut test  
dapan1121 已提交
53
  
D
dapan1121 已提交
54
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
55
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
56
      .pCont   = msg,
D
dapan1121 已提交
57 58 59 60 61 62
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
63 64
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for use db, code:%x", rpcRsp.code);
D
dapan1121 已提交
65
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
66
  }
D
dapan1121 已提交
67

D
catalog  
dapan1121 已提交
68
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
69

D
dapan1121 已提交
70 71
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
72 73


D
dapan1121 已提交
74
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
75 76 77 78 79
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
80
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);

  STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName));

  if (NULL == tbMeta) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

  if (tbMeta->tableType == TSDB_CHILD_TABLE) {
    STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
    if (NULL == stbMeta || NULL == *stbMeta) {
      *exist = 0;
      return TSDB_CODE_SUCCESS;
    }

    if ((*stbMeta)->suid != tbMeta->suid) {
      ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
100
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
101 102 103 104 105 106
    }

    int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
    *pTableMeta = calloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgError("calloc size[%d] failed", metaSize);
D
dapan1121 已提交
107
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
108 109 110 111 112 113 114 115 116
    }

    memcpy(*pTableMeta, tbMeta, sizeof(SCTableMeta));
    memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));
  } else {
    int32_t metaSize = sizeof(STableMeta) + (tbMeta->tableInfo.numOfTags + tbMeta->tableInfo.numOfColumns) * sizeof(SSchema);
    *pTableMeta = calloc(1, metaSize);
    if (NULL == *pTableMeta) {
      ctgError("calloc size[%d] failed", metaSize);
D
dapan1121 已提交
117
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137
    }

    memcpy(*pTableMeta, tbMeta, metaSize);
  }

  *exist = 1;
  
  return TSDB_CODE_SUCCESS;
}

void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

D
dapan1121 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == output) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
152
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
153 154 155 156 157 158

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
159

D
dapan1121 已提交
160 161 162 163 164 165 166 167 168
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
    CTG_ERR_RET(rpcRsp.code);
  }

D
catalog  
dapan1121 已提交
169
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
170 171 172 173 174 175

  return TSDB_CODE_SUCCESS;
}


int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
D
dapan1121 已提交
176
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
177
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
178 179
  }

D
dapan1121 已提交
180
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
181 182 183 184 185 186 187 188

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
189
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
190 191

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
192
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
193 194 195 196 197 198 199 200 201 202
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);

  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
203
  
D
dapan1121 已提交
204
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
205
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
D
dapan1121 已提交
206
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
207 208
  }

D
catalog  
dapan1121 已提交
209
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
210 211 212 213 214

  return TSDB_CODE_SUCCESS;
}


215 216
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
217 218 219 220 221 222 223 224
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
225
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
226
  SHashObj *vgroupHash = NULL;
227 228
  SVgroupInfo *vgInfo = NULL;

D
dapan 已提交
229 230 231 232 233 234
  *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
  if (NULL == *vgroupList) {
    ctgError("taosArrayInit failed");
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

235 236 237
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
238

D
dapan 已提交
239
    if (NULL == taosArrayPush(*vgroupList, vgInfo)) {
240
      ctgError("taosArrayPush failed");
D
dapan1121 已提交
241
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
242 243 244 245
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
246 247 248 249 250
  }

  return TSDB_CODE_SUCCESS;
}

251 252 253 254
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
  if (vgNum <= 0) {
    ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
D
dapan1121 已提交
255
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
256 257
  }

258 259
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
260

261 262 263 264 265 266 267 268 269 270 271 272 273
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
274
    }
275 276 277
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
278 279
  }

280 281
  if (NULL == vgInfo) {
    ctgError("no hash range found for hashvalue[%u]", hashValue);
D
dapan1121 已提交
282
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
283 284 285 286
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
287 288 289 290
  return TSDB_CODE_SUCCESS;
}

int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
D
dapan1121 已提交
291
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
292
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310
  }

  int32_t exist = 0;

  if (!forceUpdate) {  
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

  CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

  if (0 == exist) {
    ctgError("get table meta from cache failed, but fetch succeed");
D
dapan1121 已提交
311
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
312 313 314 315 316 317
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
318
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
319 320
  if (output->metaNum != 1 && output->metaNum != 2) {
    ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
D
dapan1121 已提交
321
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
322 323 324 325
  }

  if (NULL == output->tbMeta) {
    ctgError("no valid table meta got from meta rsp");
D
dapan1121 已提交
326
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
327 328 329 330 331 332
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
333
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
334
    }
D
dapan1121 已提交
335
  }
D
dapan1121 已提交
336

D
dapan1121 已提交
337
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
338 339 340
    pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.stableCache) {
      ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
341
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
342 343 344 345 346 347
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
      ctgError("push ctable[%s] to table cache failed", output->ctbFname);
D
dapan1121 已提交
348
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
349 350 351 352
    }

    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
      ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
D
dapan1121 已提交
353
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
354 355 356
    }    
  }

D
dapan1121 已提交
357 358
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
  if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
D
dapan1121 已提交
359
    ctgError("push table[%s] to table cache failed", output->tbFname);
D
dapan1121 已提交
360
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
361 362 363 364 365
  }

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
    if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
      ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
D
dapan1121 已提交
366
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
367 368 369 370 371 372
    }
  }
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407

int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) {
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  int32_t exist = 0;

  if (0 == forceUpdate) {
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
  
  CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));

  CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));

  if (dbInfo) {
    *dbInfo = DbOut.dbVgroup;
  }

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
408
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
409 410 411
  if (ctgMgmt.pCluster) {
    ctgError("catalog already init");
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
412 413 414 415
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
D
dapan1121 已提交
416 417 418 419 420 421 422 423
    
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
424 425 426
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan 已提交
427 428
  }

D
dapan1121 已提交
429 430 431
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  if (NULL == ctgMgmt.pCluster) {
    CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
D
dapan1121 已提交
432 433
  }

D
dapan 已提交
434
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
435 436
}

437
int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle) {
D
dapan1121 已提交
438
  if (NULL == clusterId || NULL == catalogHandle) {
D
dapan1121 已提交
439
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
440 441 442 443
  }

  if (NULL == ctgMgmt.pCluster) {
    ctgError("cluster cache are not ready");
D
dapan1121 已提交
444
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
445 446 447
  }

  size_t clen = strlen(clusterId);
D
catalog  
dapan1121 已提交
448
  SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, clusterId, clen);
D
dapan 已提交
449

D
catalog  
dapan1121 已提交
450 451
  if (ctg && (*ctg)) {
    *catalogHandle = *ctg;
D
dapan1121 已提交
452
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
453 454
  }

D
catalog  
dapan1121 已提交
455
  SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
D
dapan 已提交
456
  if (NULL == clusterCtg) {
D
catalog  
dapan1121 已提交
457
    ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
D
dapan1121 已提交
458
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
459 460 461 462 463
  }

  if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
    ctgError("put cluster %s cache to hash failed", clusterId);
    tfree(clusterCtg);
D
dapan1121 已提交
464
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
465
  }
D
dapan1121 已提交
466 467

  *catalogHandle = clusterCtg;
D
dapan 已提交
468
  
D
dapan1121 已提交
469
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
470 471
}

D
dapan1121 已提交
472 473
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
474
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
475 476 477 478 479 480 481 482 483 484 485 486 487
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

  SDBVgroupInfo * dbInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

488
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
489 490 491 492

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
493
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
494
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
495
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
496 497
  }

498
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
499
    if (pCatalog->dbCache.cache) {
D
dapan1121 已提交
500 501 502 503 504 505
      SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
      if (oldInfo && oldInfo->vgInfo) {
        taosHashCleanup(oldInfo->vgInfo);
        oldInfo->vgInfo = NULL;
      }
    
D
dapan1121 已提交
506 507 508 509 510 511
      taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName));
    }
    
    ctgWarn("remove db [%s] from cache", dbName);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
512

D
dapan1121 已提交
513
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
514
    pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
515
    if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
516
      ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
517
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
518
    }
519 520 521 522 523 524
  } else {
    SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName));
    if (oldInfo && oldInfo->vgInfo) {
      taosHashCleanup(oldInfo->vgInfo);
      oldInfo->vgInfo = NULL;
    }
D
dapan1121 已提交
525 526 527 528
  }

  if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
    ctgError("push to vgroup hash cache failed");
D
dapan1121 已提交
529
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
530 531 532
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
533 534
}

535 536
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
D
dapan1121 已提交
537
}
D
dapan1121 已提交
538

D
dapan1121 已提交
539 540
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
541
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
542 543
  }

D
dapan1121 已提交
544
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
545
  int32_t code = 0;
D
dapan1121 已提交
546
  
D
dapan1121 已提交
547
  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
D
dapan1121 已提交
548

D
dapan1121 已提交
549 550
  STableMetaOutput output = {0};
  
D
dapan1121 已提交
551 552 553
  //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));

  CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
D
dapan1121 已提交
554

D
dapan1121 已提交
555 556 557
  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));

_return:
D
dapan1121 已提交
558

D
dapan1121 已提交
559 560
  tfree(output.tbMeta);
  
D
dapan1121 已提交
561
  CTG_RET(code);
562
}
563

D
dapan1121 已提交
564
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
565 566 567
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}

D
dapan 已提交
568
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) {
D
dapan1121 已提交
569
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
570
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
571 572 573 574 575
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
576
  SDBVgroupInfo dbVgroup = {0};
D
dapan1121 已提交
577 578 579
  
  CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));

D
dapan1121 已提交
580
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
D
dapan 已提交
581

582 583
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList));
D
dapan1121 已提交
584
  } else {
585 586 587
    int32_t vgId = tbMeta->vgId;
    if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
      ctgError("vgId[%d] not found in vgroup list", vgId);
D
dapan 已提交
588
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
589
    }
D
dapan1121 已提交
590

D
dapan 已提交
591 592 593 594 595 596 597
    *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == *pVgroupList) {
      ctgError("taosArrayInit failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

    if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) {
D
dapan1121 已提交
598 599 600 601 602
      ctgError("push vgroupInfo to array failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
  }

D
dapan 已提交
603 604 605 606
  tfree(tbMeta);

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
607 608
_return:
  tfree(tbMeta);
D
dapan 已提交
609 610

  taosArrayDestroy(*pVgroupList);
D
dapan1121 已提交
611
  *pVgroupList = NULL;
D
dapan1121 已提交
612
  
D
dapan1121 已提交
613
  CTG_RET(code);
D
dapan1121 已提交
614 615 616
}


617
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
618 619 620 621
  SDBVgroupInfo dbInfo = {0};
  int32_t code = 0;
  int32_t vgId = 0;

D
dapan1121 已提交
622
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
D
dapan1121 已提交
623 624 625

  if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) {
    ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo);
D
dapan1121 已提交
626
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
627 628 629 630
  }

  CTG_ERR_RET(ctgGetVgInfoFromHashValue(&dbInfo, pDBName, pTableName, pVgroup));

D
dapan1121 已提交
631
  CTG_RET(code);
D
dapan1121 已提交
632 633 634
}


D
dapan1121 已提交
635 636
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
637
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
638
  }
D
dapan1121 已提交
639 640 641 642

  int32_t code = 0;

  if (pReq->pTableName) {
643
    char dbName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
644
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
645 646 647 648 649 650 651 652 653
    if (tbNum <= 0) {
      ctgError("empty table name list");
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
    
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
      ctgError("taosArrayInit num[%d] failed", tbNum);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
654 655 656 657 658 659
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
660
      snprintf(dbName, sizeof(dbName), "%d.%s", name->acctId, name->dbname);
D
dapan1121 已提交
661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683

      CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
684
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
685
  }
D
dapan 已提交
686
  
D
dapan1121 已提交
687
  CTG_RET(code);
688
}
D
dapan 已提交
689

D
dapan1121 已提交
690 691
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
692 693 694 695 696 697 698 699
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }


  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
700 701 702 703 704 705 706 707 708
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
}