catalog.c 24.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15 16
 */

#include "catalogInt.h"
D
dapan1121 已提交
17
#include "trpc.h"
D
dapan1121 已提交
18
#include "query.h"
D
dapan1121 已提交
19
#include "tname.h"
20

D
dapan1121 已提交
21 22
SCatalogMgmt ctgMgmt = {0};

D
dapan1121 已提交
23
int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SDBVgroupInfo **dbInfo, bool *inCache) {
D
dapan1121 已提交
24
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
25
    *inCache = false;
D
dapan1121 已提交
26 27 28
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
29
  SDBVgroupInfo *info = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
30

31
  if (NULL == info) {
D
dapan1121 已提交
32
    *inCache = false;
D
dapan1121 已提交
33 34
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
35

D
dapan1121 已提交
36 37 38 39 40
  CTG_LOCK(CTG_READ, &info->lock);
  if (NULL == info->vgInfo) {
    CTG_UNLOCK(CTG_READ, &info->lock);
    *inCache = false;
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
41
  }
D
dapan1121 已提交
42

D
dapan1121 已提交
43 44
  *dbInfo = info;
  *inCache = true;
D
dapan1121 已提交
45 46 47 48 49 50 51 52 53 54 55
  
  return TSDB_CODE_SUCCESS;
}



int32_t ctgGetDBVgroupFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
56
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen));
D
ut test  
dapan1121 已提交
57
  
D
dapan1121 已提交
58
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
59
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
60
      .pCont   = msg,
D
dapan1121 已提交
61 62 63 64 65 66
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
67 68
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for use db, code:%x", rpcRsp.code);
D
dapan1121 已提交
69
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
70
  }
D
dapan1121 已提交
71

D
catalog  
dapan1121 已提交
72
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
73

D
dapan1121 已提交
74 75
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
76 77


D
dapan1121 已提交
78
int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const char *dbName, const char* pTableName, STableMeta** pTableMeta, int32_t *exist) {
D
dapan1121 已提交
79 80 81 82 83
  if (NULL == pCatalog->tableCache.cache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
84
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
85 86 87

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", dbName, pTableName);

D
dapan1121 已提交
88 89 90 91
  *pTableMeta = NULL;

  size_t sz = 0;
  STableMeta *tbMeta = taosHashGetCloneExt(pCatalog->tableCache.cache, tbFullName, strlen(tbFullName), NULL, (void **)pTableMeta, &sz);
D
dapan1121 已提交
92

D
dapan1121 已提交
93
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
94 95 96 97
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
98
  *exist = 1;
D
dapan1121 已提交
99

D
dapan1121 已提交
100 101 102 103 104 105 106 107 108 109 110 111 112 113
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_READ, &pCatalog->tableCache.stableLock);
  
  STableMeta **stbMeta = taosHashGet(pCatalog->tableCache.stableCache, &tbMeta->suid, sizeof(tbMeta->suid));
  if (NULL == stbMeta || NULL == *stbMeta) {
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    qError("no stable:%"PRIx64 " meta in cache", tbMeta->suid);
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
114

D
dapan1121 已提交
115 116 117 118 119 120
  if ((*stbMeta)->suid != tbMeta->suid) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    tfree(*pTableMeta);
    ctgError("stable cache error, expected suid:%"PRId64 ",actual suid:%"PRId64, tbMeta->suid, (*stbMeta)->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
121

D
dapan1121 已提交
122 123 124 125 126 127
  int32_t metaSize = sizeof(STableMeta) + ((*stbMeta)->tableInfo.numOfTags + (*stbMeta)->tableInfo.numOfColumns) * sizeof(SSchema);
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
    CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
    ctgError("calloc size[%d] failed", metaSize);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
128 129
  }

D
dapan1121 已提交
130 131 132
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

  CTG_UNLOCK(CTG_READ, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
133 134 135 136 137 138 139 140 141 142 143 144 145 146
  
  return TSDB_CODE_SUCCESS;
}

void ctgGenEpSet(SEpSet *epSet, SVgroupInfo *vgroupInfo) {
  epSet->inUse = 0;
  epSet->numOfEps = vgroupInfo->numOfEps;

  for (int32_t i = 0; i < vgroupInfo->numOfEps; ++i) {
    memcpy(&epSet->port[i], &vgroupInfo->epAddr[i].port, sizeof(epSet->port[i]));
    memcpy(&epSet->fqdn[i], &vgroupInfo->epAddr[i].fqdn, sizeof(epSet->fqdn[i]));
  }
}

D
dapan1121 已提交
147 148 149 150 151 152 153 154 155 156 157 158 159 160
int32_t ctgGetTableMetaFromMnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, STableMetaOutput* output) {
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == output) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = 0, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
161
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
162 163 164 165 166 167

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
168

D
dapan1121 已提交
169 170 171 172 173 174 175 176 177
  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
    CTG_ERR_RET(rpcRsp.code);
  }

D
catalog  
dapan1121 已提交
178
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
179 180 181 182 183 184

  return TSDB_CODE_SUCCESS;
}


int32_t ctgGetTableMetaFromVnode(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char *pDBName, const char* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
D
dapan1121 已提交
185
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
186
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
187 188
  }

D
dapan1121 已提交
189
  char tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
190 191 192 193 194 195 196 197

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .tableFullName = tbFullName};
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
catalog  
dapan1121 已提交
198
  CTG_ERR_RET(queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen));
D
dapan1121 已提交
199 200

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
201
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
202 203 204 205 206 207 208 209 210 211
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  SEpSet  epSet;
  
  ctgGenEpSet(&epSet, vgroupInfo);

  rpcSendRecv(pRpc, &epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
212
  
D
dapan1121 已提交
213
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
214
    ctgError("error rsp for table meta, code:%x", rpcRsp.code);
D
dapan1121 已提交
215
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
216 217
  }

D
catalog  
dapan1121 已提交
218
  CTG_ERR_RET(queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen));
D
dapan1121 已提交
219 220 221 222 223

  return TSDB_CODE_SUCCESS;
}


224 225
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
226 227 228 229 230 231 232 233
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
234
int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) {
D
dapan1121 已提交
235
  SHashObj *vgroupHash = NULL;
236
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
237 238
  SArray *vgList = NULL;
  int32_t code = 0;
239

D
dapan1121 已提交
240 241
  vgList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
D
dapan 已提交
242 243 244 245
    ctgError("taosArrayInit failed");
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

246 247 248
  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
249

D
dapan1121 已提交
250
    if (NULL == taosArrayPush(vgList, vgInfo)) {
251
      ctgError("taosArrayPush failed");
D
dapan1121 已提交
252
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
253 254 255 256
    }
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
257 258
  }

D
dapan1121 已提交
259 260 261
  *vgroupList = vgList;
  vgList = NULL;

D
dapan1121 已提交
262
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
263 264 265 266 267 268 269 270

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
271 272
}

273
int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
274 275 276 277
  int32_t code = 0;
  
  CTG_LOCK(CTG_READ, &dbInfo->lock);
  
278 279 280
  int32_t vgNum = taosHashGetSize(dbInfo->vgInfo);
  if (vgNum <= 0) {
    ctgError("db[%s] vgroup cache invalid, vgroup number:%d", pDBName, vgNum);
D
dapan1121 已提交
281
    CTG_ERR_JRET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
282 283
  }

284 285
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
286

D
dapan1121 已提交
287
  CTG_ERR_JRET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
288 289 290 291 292 293 294 295 296 297 298 299

  char tbFullName[TSDB_TABLE_FNAME_LEN];

  snprintf(tbFullName, sizeof(tbFullName), "%s.%s", pDBName, pTableName);

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

  void *pIter = taosHashIterate(dbInfo->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      break;
D
dapan1121 已提交
300
    }
301 302 303
    
    pIter = taosHashIterate(dbInfo->vgInfo, pIter);
    vgInfo = NULL;
D
dapan1121 已提交
304 305
  }

306 307
  if (NULL == vgInfo) {
    ctgError("no hash range found for hashvalue[%u]", hashValue);
D
dapan1121 已提交
308
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
309 310 311 312
  }

  *pVgroup = *vgInfo;

D
dapan1121 已提交
313 314 315 316 317
_return:

  CTG_UNLOCK(CTG_READ, &dbInfo->lock);
  
  CTG_RET(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
318 319 320
}

int32_t ctgGetTableMetaImpl(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, bool forceUpdate, STableMeta** pTableMeta) {
D
dapan1121 已提交
321
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
322
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
323
  }
D
dapan1121 已提交
324
  
D
dapan1121 已提交
325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
  int32_t exist = 0;

  if (!forceUpdate) {  
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

    if (exist) {
      return TSDB_CODE_SUCCESS;
    }
  }

  CTG_ERR_RET(catalogRenewTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName));

  CTG_ERR_RET(ctgGetTableMetaFromCache(pCatalog, pDBName, pTableName, pTableMeta, &exist));

  if (0 == exist) {
    ctgError("get table meta from cache failed, but fetch succeed");
D
dapan1121 已提交
341
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
342 343 344 345 346 347
  }
  
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
348
int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) {
D
dapan1121 已提交
349 350
  int32_t code = 0;
  
D
dapan1121 已提交
351 352
  if (output->metaNum != 1 && output->metaNum != 2) {
    ctgError("invalid table meta number[%d] got from meta rsp", output->metaNum);
D
dapan1121 已提交
353
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
354 355 356 357
  }

  if (NULL == output->tbMeta) {
    ctgError("no valid table meta got from meta rsp");
D
dapan1121 已提交
358
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
359 360 361 362 363 364
  }

  if (NULL == pCatalog->tableCache.cache) {
    pCatalog->tableCache.cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.cache) {
      ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
365
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
366
    }
D
dapan1121 已提交
367
  }
D
dapan1121 已提交
368

D
dapan1121 已提交
369
  if (NULL == pCatalog->tableCache.stableCache) {
D
dapan1121 已提交
370 371 372
    pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
    if (NULL == pCatalog->tableCache.stableCache) {
      ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
373
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
374 375 376 377 378 379
    }
  }

  if (output->metaNum == 2) {
    if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
      ctgError("push ctable[%s] to table cache failed", output->ctbFname);
D
dapan1121 已提交
380
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
381 382 383 384
    }

    if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
      ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
D
dapan1121 已提交
385
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
386 387 388
    }    
  }

D
dapan1121 已提交
389
  int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
D
dapan1121 已提交
390 391

  if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
392 393 394 395 396 397 398 399 400 401
    CTG_LOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
      ctgError("push table[%s] to table cache failed", output->tbFname);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }

    STableMeta *tbMeta = taosHashGet(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname));
    if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES) != 0) {
      CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
D
dapan1121 已提交
402
      ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
D
dapan1121 已提交
403 404 405 406 407 408 409
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    CTG_UNLOCK(CTG_WRITE, &pCatalog->tableCache.stableLock);
  } else {
    if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
      ctgError("push table[%s] to table cache failed", output->tbFname);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
410 411
    }
  }
D
dapan1121 已提交
412 413 414

_return:
  tfree(output->tbMeta);
D
dapan1121 已提交
415
  
D
dapan1121 已提交
416
  CTG_RET(code);
D
dapan1121 已提交
417 418
}

D
dapan1121 已提交
419

D
dapan1121 已提交
420 421
int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo** dbInfo) {
  bool inCache = false;
D
dapan1121 已提交
422
  if (0 == forceUpdate) {
D
dapan1121 已提交
423
    CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
D
dapan1121 已提交
424

D
dapan1121 已提交
425
    if (inCache) {
D
dapan1121 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

  strncpy(input.db, dbName, sizeof(input.db));
  input.db[sizeof(input.db) - 1] = 0;
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
  
  CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));

  CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup));

D
dapan1121 已提交
441
  CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &inCache));
D
dapan1121 已提交
442 443 444 445 446

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
447
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan1121 已提交
448 449 450
  if (ctgMgmt.pCluster) {
    ctgError("catalog already init");
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
451 452 453 454
  }

  if (cfg) {
    memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
D
dapan1121 已提交
455 456 457 458 459 460 461 462
    
    if (ctgMgmt.cfg.maxDBCacheNum == 0) {
      ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    }

    if (ctgMgmt.cfg.maxTblCacheNum == 0) {
      ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
    }
D
dapan1121 已提交
463 464 465
  } else {
    ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
D
dapan 已提交
466 467
  }

D
dapan1121 已提交
468 469 470
  ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
  if (NULL == ctgMgmt.pCluster) {
    CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
D
dapan1121 已提交
471 472
  }

D
dapan 已提交
473
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
474 475
}

476
int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle) {
D
dapan1121 已提交
477
  if (NULL == clusterId || NULL == catalogHandle) {
D
dapan1121 已提交
478
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
479 480 481 482
  }

  if (NULL == ctgMgmt.pCluster) {
    ctgError("cluster cache are not ready");
D
dapan1121 已提交
483
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
484 485 486
  }

  size_t clen = strlen(clusterId);
D
catalog  
dapan1121 已提交
487
  SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, clusterId, clen);
D
dapan 已提交
488

D
catalog  
dapan1121 已提交
489 490
  if (ctg && (*ctg)) {
    *catalogHandle = *ctg;
D
dapan1121 已提交
491
    return TSDB_CODE_SUCCESS;
D
dapan 已提交
492 493
  }

D
catalog  
dapan1121 已提交
494
  SCatalog *clusterCtg = calloc(1, sizeof(SCatalog));
D
dapan 已提交
495
  if (NULL == clusterCtg) {
D
catalog  
dapan1121 已提交
496
    ctgError("calloc %d failed", (int32_t)sizeof(SCatalog));
D
dapan1121 已提交
497
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
498 499 500 501 502
  }

  if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
    ctgError("put cluster %s cache to hash failed", clusterId);
    tfree(clusterCtg);
D
dapan1121 已提交
503
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
504
  }
D
dapan1121 已提交
505 506

  *catalogHandle = clusterCtg;
D
dapan 已提交
507
  
D
dapan1121 已提交
508
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
509 510
}

D
dapan1121 已提交
511 512
int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version) {
  if (NULL == pCatalog || NULL == dbName || NULL == version) {
D
dapan1121 已提交
513
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
514 515 516 517 518 519 520
  }

  if (NULL == pCatalog->dbCache.cache) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
521
  SDBVgroupInfo * dbInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
D
dapan1121 已提交
522 523 524 525 526
  if (NULL == dbInfo) {
    *version = CTG_DEFAULT_INVALID_VERSION;
    return TSDB_CODE_SUCCESS;
  }

527
  *version = dbInfo->vgVersion;
D
dapan1121 已提交
528
  taosHashRelease(pCatalog->dbCache.cache, dbInfo);
D
dapan1121 已提交
529 530 531 532

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SArray** vgroupList) {
  if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SDBVgroupInfo* db = NULL;
  int32_t code = 0;
  SVgroupInfo *vgInfo = NULL;
  SArray *vgList = NULL;
  
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &db));

  vgList = taosArrayInit(taosHashGetSize(db->vgInfo), sizeof(SVgroupInfo));
  if (NULL == vgList) {
    ctgError("taosArrayInit failed");
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
  }

  void *pIter = taosHashIterate(db->vgInfo, NULL);
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
      ctgError("taosArrayPush failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(db->vgInfo, pIter);
    vgInfo = NULL;
  }

  *vgroupList = vgList;
  vgList = NULL;

_return:

  if (db) {
    CTG_UNLOCK(CTG_READ, &db->lock);
    taosHashRelease(pCatalog->dbCache.cache, db);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);  
}


D
dapan1121 已提交
583
int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
D
dapan1121 已提交
584
  if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) {
D
dapan1121 已提交
585
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
586 587
  }

588
  if (dbInfo->vgVersion < 0) {
D
dapan1121 已提交
589
    if (pCatalog->dbCache.cache) {
D
dapan1121 已提交
590 591 592 593 594 595 596 597 598 599
      SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
      if (oldInfo) {
        CTG_LOCK(CTG_WRITE, &oldInfo->lock);
        if (oldInfo->vgInfo) {
          taosHashCleanup(oldInfo->vgInfo);
          oldInfo->vgInfo = NULL;
        }
        CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);

        taosHashRelease(pCatalog->dbCache.cache, oldInfo);
D
dapan1121 已提交
600
      }
D
dapan1121 已提交
601 602 603 604 605
    }
    
    ctgWarn("remove db [%s] from cache", dbName);
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
606

D
dapan1121 已提交
607
  if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
608
    pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
609
    if (NULL == pCatalog->dbCache.cache) {
D
dapan1121 已提交
610
      ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
D
dapan1121 已提交
611
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
612
    }
613
  } else {
D
dapan1121 已提交
614 615 616 617 618 619 620 621 622 623
    SDBVgroupInfo *oldInfo = taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
    if (oldInfo) {
      CTG_LOCK(CTG_WRITE, &oldInfo->lock);
      if (oldInfo->vgInfo) {
        taosHashCleanup(oldInfo->vgInfo);
        oldInfo->vgInfo = NULL;
      }
      CTG_UNLOCK(CTG_WRITE, &oldInfo->lock);
    
      taosHashRelease(pCatalog->dbCache.cache, oldInfo);
624
    }
D
dapan1121 已提交
625 626 627 628
  }

  if (taosHashPut(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo)) != 0) {
    ctgError("push to vgroup hash cache failed");
D
dapan1121 已提交
629
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
630 631 632
  }

  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
633 634
}

635 636
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
  return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta);
D
dapan1121 已提交
637
}
D
dapan1121 已提交
638

D
dapan1121 已提交
639 640
int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName) {
  if (NULL == pCatalog || NULL == pDBName || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
641
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
642 643
  }

D
dapan1121 已提交
644
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
645
  int32_t code = 0;
D
dapan1121 已提交
646
  
D
dapan1121 已提交
647
  CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo));
D
dapan1121 已提交
648

D
dapan1121 已提交
649 650
  STableMetaOutput output = {0};
  
D
dapan1121 已提交
651 652 653
  //CTG_ERR_RET(ctgGetTableMetaFromVnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo, &output));

  CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output));
D
dapan1121 已提交
654

D
dapan1121 已提交
655 656 657
  CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output));

_return:
D
dapan1121 已提交
658

D
dapan1121 已提交
659 660
  tfree(output.tbMeta);
  
D
dapan1121 已提交
661
  CTG_RET(code);
662
}
663

D
dapan1121 已提交
664
int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
665 666 667
  return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta);
}

D
dapan 已提交
668
int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) {
D
dapan1121 已提交
669
  if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) {
D
dapan1121 已提交
670
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
671 672 673 674 675
  }
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
676 677 678 679
  SDBVgroupInfo* dbVgroup = NULL;
  SArray *vgList = NULL;

  *pVgroupList = NULL;
D
dapan1121 已提交
680 681 682
  
  CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta));

D
dapan1121 已提交
683
  CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup));
D
dapan 已提交
684

685
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
686
    CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
D
dapan1121 已提交
687
  } else {
688
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
689
    if (NULL == taosHashGetClone(dbVgroup->vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) {
690
      ctgError("vgId[%d] not found in vgroup list", vgId);
D
dapan 已提交
691
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
692
    }
D
dapan1121 已提交
693

D
dapan1121 已提交
694 695
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan 已提交
696 697 698 699
      ctgError("taosArrayInit failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
700
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
D
dapan1121 已提交
701 702 703
      ctgError("push vgroupInfo to array failed");
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
704

D
dapan1121 已提交
705 706 707
    *pVgroupList = vgList;
    vgList = NULL;
  }
D
dapan 已提交
708

D
dapan1121 已提交
709 710
_return:
  tfree(tbMeta);
D
dapan 已提交
711

D
dapan1121 已提交
712 713 714 715 716 717 718 719 720
  if (dbVgroup) {
    CTG_UNLOCK(CTG_READ, &dbVgroup->lock);
    taosHashRelease(pCatalog->dbCache.cache, dbVgroup);
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
721
  
D
dapan1121 已提交
722
  CTG_RET(code);
D
dapan1121 已提交
723 724 725
}


726
int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, const SEpSet *pMgmtEps, const char *pDBName, const char *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
727
  SDBVgroupInfo* dbInfo = NULL;
D
dapan1121 已提交
728 729 730
  int32_t code = 0;
  int32_t vgId = 0;

D
dapan1121 已提交
731
  CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo));
D
dapan1121 已提交
732

D
dapan1121 已提交
733
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(dbInfo, pDBName, pTableName, pVgroup));
D
dapan1121 已提交
734

D
dapan1121 已提交
735 736 737 738 739 740
_return:

  if (dbInfo) {
    CTG_UNLOCK(CTG_READ, &dbInfo->lock);  
    taosHashRelease(pCatalog->dbCache.cache, dbInfo);
  }
D
dapan1121 已提交
741

D
dapan1121 已提交
742
  CTG_RET(code);
D
dapan1121 已提交
743 744 745
}


D
dapan1121 已提交
746 747
int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
748
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
749
  }
D
dapan1121 已提交
750 751 752 753

  int32_t code = 0;

  if (pReq->pTableName) {
754
    char dbName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
755
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
756 757 758 759 760 761 762 763 764
    if (tbNum <= 0) {
      ctgError("empty table name list");
      CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
    }
    
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
      ctgError("taosArrayInit num[%d] failed", tbNum);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
765 766 767 768 769 770
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
H
Haojun Liao 已提交
771
      snprintf(dbName, sizeof(dbName), "%d.%s", name->acctId, name->dbname);
D
dapan1121 已提交
772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794

      CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, dbName, name->tname, &pTableMeta));

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

  return TSDB_CODE_SUCCESS;

_return:  

  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
795
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
796
  }
D
dapan 已提交
797
  
D
dapan1121 已提交
798
  CTG_RET(code);
799
}
D
dapan 已提交
800

D
dapan1121 已提交
801 802
int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
  if (NULL == pCatalog || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan 已提交
803 804 805 806 807 808 809 810
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }


  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
811 812 813 814 815 816 817 818 819
void catalogDestroy(void) {
  if (ctgMgmt.pCluster) {
    taosHashCleanup(ctgMgmt.pCluster); //TBD
    ctgMgmt.pCluster = NULL;
  }
}