catalog.c 68.5 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan 已提交
21 22 23 24 25 26
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTbl(SCtgMetaAction *action);

D
dapan 已提交
27
SCatalogMgmt gCtgMgmt = {0};
D
dapan1121 已提交
28
SCtgDebug gCTGDebug = {0};
D
dapan 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
SCtgAction gCtgAction[CTG_ACT_MAX] = {{
                            CTG_ACT_UPDATE_VG,
                            "update vgInfo",
                            ctgActUpdateVg
                          },
                          {
                            CTG_ACT_UPDATE_TBL,
                            "update tbMeta",
                            ctgActUpdateTbl
                          },
                          {
                            CTG_ACT_REMOVE_DB,
                            "remove DB",
                            ctgActRemoveDB
                          },
                          {
                            CTG_ACT_REMOVE_STB,
                            "remove stbMeta",
                            ctgActRemoveStb
                          },
                          {
                            CTG_ACT_REMOVE_TBL,
                            "remove tbMeta",
                            ctgActRemoveTbl
                          }
};

int32_t ctgDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gCTGDebug.lockDebug = true;
    qDebug("lock debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "cache")) {
    gCTGDebug.cacheDebug = true;
    qDebug("cache debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "api")) {
    gCTGDebug.apiDebug = true;
    qDebug("api debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid debug option:%s", option);
  
  return TSDB_CODE_CTG_INTERNAL_ERROR;
}

int32_t ctgDbgGetStatNum(char *option, void *res) {
  if (0 == strcasecmp(option, "runtime.qDoneNum")) {
    *(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
    return TSDB_CODE_SUCCESS;
  }
85

D
dapan 已提交
86 87 88 89
  qError("invalid stat option:%s", option);
  
  return TSDB_CODE_CTG_INTERNAL_ERROR;
}
D
dapan1121 已提交
90

D
dapan1121 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) {
  return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
}

int32_t ctgDbgGetStbNum(SCtgDBCache *dbCache) {
  return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
}

int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) {
  int32_t num = 0;
  for (uint16_t i = 0; i < rent->slotNum; ++i) {
    SCtgRentSlot *slot = &rent->slots[i];
    if (NULL == slot->meta) {
      continue;
    }

    num += taosArrayGetSize(slot->meta);
  }

  return num;
}

D
dapan1121 已提交
113 114
int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
  if (NULL == pCtg || NULL == pCtg->dbCache) {
D
dapan1121 已提交
115 116 117 118 119
    return 0;
  }

  switch (type) {
    case CTG_DBG_DB_NUM:
D
dapan1121 已提交
120
      return (int32_t)taosHashGetSize(pCtg->dbCache);
D
dapan1121 已提交
121
    case CTG_DBG_DB_RENT_NUM:
D
dapan1121 已提交
122
      return ctgDbgGetRentNum(&pCtg->dbRent);
D
dapan1121 已提交
123
    case CTG_DBG_STB_RENT_NUM:
D
dapan1121 已提交
124
      return ctgDbgGetRentNum(&pCtg->stbRent);
D
dapan1121 已提交
125 126 127 128 129 130
    default:
      break;
  }

  SCtgDBCache *dbCache = NULL;
  int32_t num = 0;
D
dapan1121 已提交
131
  void *pIter = taosHashIterate(pCtg->dbCache, NULL);
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144
  while (pIter) {
    dbCache = (SCtgDBCache *)pIter;
    switch (type) {
      case CTG_DBG_META_NUM:
        num += ctgDbgGetTbMetaNum(dbCache);
        break;
      case CTG_DBG_STB_NUM:
        num += ctgDbgGetStbNum(dbCache);
        break;
      default:
        ctgError("invalid type:%d", type);
        break;
    }
D
dapan1121 已提交
145
    pIter = taosHashIterate(pCtg->dbCache, pIter);
D
dapan1121 已提交
146 147 148 149 150 151 152
  }

  return num;
}


void ctgDbgShowDBCache(SHashObj *dbHash) {
D
dapan1121 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165
  if (NULL == dbHash) {
    return;
  }

  int32_t i = 0;
  SCtgDBCache *dbCache = NULL;
  void *pIter = taosHashIterate(dbHash, NULL);
  while (pIter) {
    char *dbFName = NULL;
    size_t len = 0;
    
    dbCache = (SCtgDBCache *)pIter;

D
dapan1121 已提交
166
    taosHashGetKey(dbCache, (void **)&dbFName, &len);
D
dapan1121 已提交
167
    
D
dapan1121 已提交
168
    CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
D
dapan1121 已提交
169 170 171 172 173
    
    pIter = taosHashIterate(dbHash, pIter);
  }
}

D
dapan1121 已提交
174 175 176



D
dapan1121 已提交
177 178
void ctgDbgShowClusterCache(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
179 180 181
    return;
  }

D
dapan1121 已提交
182 183 184
  CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg);
  CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 
    ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));    
D
dapan1121 已提交
185
  
D
dapan1121 已提交
186
  ctgDbgShowDBCache(pCtg->dbCache);
D
dapan1121 已提交
187 188
}

D
dapan 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227


void ctgPopAction(SCtgMetaAction **action) {
  SCtgQNode *orig = gCtgMgmt.head;
  
  SCtgQNode *node = gCtgMgmt.head->next;
  gCtgMgmt.head = gCtgMgmt.head->next;

  CTG_QUEUE_SUB();
  
  tfree(orig);

  *action = &node->action;
}


int32_t ctgPushAction(SCtgMetaAction *action) {
  SCtgQNode *node = calloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
  node->action = *action;

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock);
  gCtgMgmt.tail->next = node;
  gCtgMgmt.tail = node;
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock);

  CTG_QUEUE_ADD();
  //CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum);

  tsem_post(&gCtgMgmt.sem);

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
  if (NULL == mgmt->slots) {
    return;
  }

  for (int32_t i = 0; i < mgmt->slotNum; ++i) {
    SCtgRentSlot *slot = &mgmt->slots[i];
    if (slot->meta) {
      taosArrayDestroy(slot->meta);
      slot->meta = NULL;
    }
  }

  tfree(mgmt->slots);
}


D
dapan1121 已提交
245 246 247 248 249
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    taosHashCleanup(cache->stbCache);
    cache->stbCache = NULL;
D
dapan1121 已提交
250
  }
D
dapan1121 已提交
251
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
D
dapan1121 已提交
252

D
dapan1121 已提交
253 254 255 256
  CTG_LOCK(CTG_WRITE, &cache->metaLock);
  if (cache->metaCache) {
    taosHashCleanup(cache->metaCache);
    cache->metaCache = NULL;
D
dapan1121 已提交
257
  }
D
dapan1121 已提交
258
  CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
D
dapan1121 已提交
259 260
}

D
dapan1121 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
  if (NULL == vgInfo) {
    return;
  }

  if (vgInfo->vgHash) {
    taosHashCleanup(vgInfo->vgHash);
    vgInfo->vgHash = NULL;
  }
  
  tfree(vgInfo);
}

D
dapan1121 已提交
274 275 276 277 278
void ctgFreeDbCache(SCtgDBCache *dbCache) {
  if (NULL == dbCache) {
    return;
  }

D
dapan1121 已提交
279
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
280
  ctgFreeVgInfo (dbCache->vgInfo);
D
dapan1121 已提交
281
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
282 283 284 285 286

  ctgFreeTableMetaCache(&dbCache->tbCache);
}


D
dapan1121 已提交
287 288 289 290 291 292
void ctgFreeHandle(SCatalog* pCtg) {
  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);
  
  if (pCtg->dbCache) {
    void *pIter = taosHashIterate(pCtg->dbCache, NULL);
D
dapan1121 已提交
293 294 295
    while (pIter) {
      SCtgDBCache *dbCache = pIter;

D
dapan1121 已提交
296 297
      atomic_store_8(&dbCache->deleted, 1);

D
dapan1121 已提交
298
      ctgFreeDbCache(dbCache);
D
dapan1121 已提交
299 300
            
      pIter = taosHashIterate(pCtg->dbCache, pIter);
D
dapan1121 已提交
301 302
    }  

D
dapan1121 已提交
303
    taosHashCleanup(pCtg->dbCache);
D
dapan1121 已提交
304 305
  }
  
D
dapan1121 已提交
306
  free(pCtg);
D
dapan1121 已提交
307 308
}

D
dapan1121 已提交
309

D
dapan 已提交
310
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
D
dapan1121 已提交
311 312
  CTG_LOCK(CTG_READ, &dbCache->vgLock);
  
D
dapan 已提交
313 314 315 316 317 318 319 320 321 322
  if (dbCache->deleted) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

  
D
dapan1121 已提交
323 324 325
  if (NULL == dbCache->vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

D
dapan 已提交
326
    *inCache = false;
D
dapan1121 已提交
327
    ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
D
dapan1121 已提交
328 329 330
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
331 332
  *inCache = true;
  
D
dapan1121 已提交
333 334 335 336 337 338 339 340 341 342 343 344 345 346
  return TSDB_CODE_SUCCESS;
}

int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);

  if (dbCache->deleted) {
    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
347

D
dapan1121 已提交
348 349 350
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
351

D
dapan1121 已提交
352 353 354
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
}
D
dapan1121 已提交
355

D
dapan1121 已提交
356 357 358
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
}
D
dapan1121 已提交
359

D
dapan1121 已提交
360

D
dapan 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
  SCtgDBCache *dbCache = NULL;
  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
  
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
    }    
    
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
    
  return TSDB_CODE_SUCCESS;
}

int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}


D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
  if (NULL == pCtg->dbCache) {
    *pCache = NULL;
    *inCache = false;
    ctgWarn("empty db cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {  
    *pCache = NULL;
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan 已提交
415 416
  ctgAcquireVgInfo(pCtg, dbCache, inCache);
  if (!(*inCache)) {
D
dapan1121 已提交
417 418 419 420
    ctgReleaseDBCache(pCtg, dbCache);
  
    *pCache = NULL;
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
421
  }
D
dapan1121 已提交
422

D
dapan1121 已提交
423
  *pCache = dbCache;
D
dapan1121 已提交
424
  *inCache = true;
D
dapan1121 已提交
425

D
dapan1121 已提交
426
  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
D
dapan1121 已提交
427 428 429 430 431 432
  
  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
433
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
D
dapan1121 已提交
434 435 436
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan 已提交
437
  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
D
dapan1121 已提交
438 439 440 441 442 443

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
444
  
D
dapan1121 已提交
445
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
446
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
447
      .pCont   = msg,
D
dapan1121 已提交
448 449 450 451 452 453
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
454
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
455 456 457 458 459
    if (CTG_DB_NOT_EXIST(rpcRsp.code)) {
      ctgDebug("db not exist in mnode, dbFName:%s", input->db);
      return rpcRsp.code;
    }
  
H
Haojun Liao 已提交
460
    ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
D
dapan1121 已提交
461
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
462
  }
D
dapan1121 已提交
463

D
dapan1121 已提交
464 465 466 467 468
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
469

D
dapan 已提交
470 471
  ctgDebug("Got db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
472 473
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
474

D
dapan1121 已提交
475 476
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
477
    *exist = 0;
D
dapan1121 已提交
478 479 480 481
    ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
482 483
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
484 485
  if (NULL == dbCache) {
    *exist = 0;
D
dapan1121 已提交
486 487 488
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
489
  size_t sz = 0;
D
dapan1121 已提交
490 491 492 493
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);  
  STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
D
dapan1121 已提交
494
  if (NULL == tbMeta) {
D
dapan 已提交
495
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
496
    
D
dapan1121 已提交
497
    *exist = 0;
D
dapan1121 已提交
498
    ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
499 500 501 502
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
503

D
dapan 已提交
504
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
505
  
D
dapan1121 已提交
506
  ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
507 508 509 510
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
511

D
dapan1121 已提交
512 513
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
514
    *exist = 0;
D
dapan1121 已提交
515
    ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
D
dapan1121 已提交
516 517 518
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
519 520
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
521

D
dapan1121 已提交
522 523
  *pTableMeta = NULL;

D
dapan1121 已提交
524 525
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
526 527 528 529 530
  if (NULL == dbCache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
531 532 533 534 535
  size_t sz = 0;  
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan1121 已提交
536
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
537
    *exist = 0;
D
dapan1121 已提交
538 539
    ctgReleaseDBCache(pCtg, dbCache);
    ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
540 541 542
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
543
  *exist = 1;
D
dapan1121 已提交
544 545
  
  tbMeta = *pTableMeta;
D
dapan1121 已提交
546

D
dapan1121 已提交
547
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
548 549
    ctgReleaseDBCache(pCtg, dbCache);
    ctgDebug("Got tbl from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, pTableName->tname);
D
dapan1121 已提交
550 551 552
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
553
  CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
554
  
D
dapan1121 已提交
555
  STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
D
dapan1121 已提交
556
  if (NULL == stbMeta || NULL == *stbMeta) {
D
dapan1121 已提交
557
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
558 559
    ctgReleaseDBCache(pCtg, dbCache);
    ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid);
D
dapan1121 已提交
560 561 562 563
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
564

D
dapan1121 已提交
565
  if ((*stbMeta)->suid != tbMeta->suid) {    
D
dapan1121 已提交
566
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
567
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
568
    tfree(*pTableMeta);
D
dapan1121 已提交
569
    ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
570 571
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
572

D
dapan1121 已提交
573
  int32_t metaSize = CTG_META_SIZE(*stbMeta);
D
dapan1121 已提交
574 575
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
D
dapan1121 已提交
576
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
577
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
578
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
579
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
580 581
  }

D
dapan1121 已提交
582 583
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

D
dapan1121 已提交
584
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
585

D
dapan1121 已提交
586
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan 已提交
587

D
dapan1121 已提交
588
  ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
589 590 591 592
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
593 594
int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
595
    ctgWarn("empty db cache, tbName:%s", pTableName->tname);  
D
dapan1121 已提交
596 597 598
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
599 600
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
601

D
dapan 已提交
602 603
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
604 605 606
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
607

D
dapan1121 已提交
608 609 610
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));

D
dapan1121 已提交
611
  if (NULL == pTableMeta) {
D
dapan1121 已提交
612
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan 已提交
613 614
    ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);  
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
615
    
D
dapan1121 已提交
616 617 618
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
619 620
  *tbType = atomic_load_8(&pTableMeta->tableType);

D
dapan1121 已提交
621 622 623 624
  taosHashRelease(dbCache->tbCache.metaCache, pTableMeta);

  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan 已提交
625
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
626

D
dapan 已提交
627
  ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, pTableName->tname, *tbType);  
D
dapan1121 已提交
628 629 630 631
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
632
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
D
dapan1121 已提交
633
  SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
D
dapan1121 已提交
634 635 636 637
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
638
  ctgDebug("try to get table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
639 640 641 642 643 644

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
645 646 647 648 649 650

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
651

D
dapan1121 已提交
652 653
  SRpcMsg rpcRsp = {0};

D
dapan1121 已提交
654
  rpcSendRecv(pTransporter, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
655 656
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
657
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
658
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
659
      ctgDebug("stablemeta not exist in mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
660 661 662
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
663
    ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
D
dapan1121 已提交
664 665 666
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
667 668
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
669
    ctgError("Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tbName);
D
dapan1121 已提交
670 671 672
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
673
  ctgDebug("Got table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
674 675 676 677

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
678
int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
D
dapan1121 已提交
679 680
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
681

D
dapan1121 已提交
682
  return ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output);
D
dapan1121 已提交
683
}
D
dapan1121 已提交
684

D
dapan1121 已提交
685 686
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
687
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
688 689
  }

D
dapan1121 已提交
690 691
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
692

D
dapan1121 已提交
693
  ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
694

D
dapan1121 已提交
695
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
D
dapan1121 已提交
696 697 698
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
699 700
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
D
dapan1121 已提交
701
    ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
702 703
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
704 705

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
706
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
707 708 709 710 711
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
H
Haojun Liao 已提交
712
  rpcSendRecv(pTransporter, &vgroupInfo->epset, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
713
  
D
dapan1121 已提交
714
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
715
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
716
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
717
      ctgDebug("tablemeta not exist in vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
718 719 720
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
721
    ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
722
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
723 724
  }

D
dapan1121 已提交
725 726
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
727
    ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
728 729 730
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
731
  ctgDebug("Got table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
732 733 734 735
  return TSDB_CODE_SUCCESS;
}


736 737
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
738 739 740 741 742 743 744 745
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
746
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
D
dapan1121 已提交
747
  SHashObj *vgroupHash = NULL;
748
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
749 750
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
751
  int32_t vgNum = taosHashGetSize(vgHash);
752

D
dapan1121 已提交
753
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
754
  if (NULL == vgList) {
D
dapan1121 已提交
755
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
756 757 758
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

D
dapan1121 已提交
759
  void *pIter = taosHashIterate(vgHash, NULL);
760 761
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
762

D
dapan1121 已提交
763
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
764
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
765
      taosHashCancelIterate(vgHash, pIter);      
D
dapan1121 已提交
766
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
767 768
    }
    
D
dapan1121 已提交
769
    pIter = taosHashIterate(vgHash, pIter);
770
    vgInfo = NULL;
D
dapan1121 已提交
771 772
  }

D
dapan1121 已提交
773
  *pList = vgList;
D
dapan1121 已提交
774

D
dapan1121 已提交
775
  ctgDebug("Got vgList from cache, vgNum:%d", vgNum);
D
dapan1121 已提交
776

D
dapan1121 已提交
777
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
778 779 780 781 782 783 784 785

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
786 787
}

D
dapan1121 已提交
788
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
789 790
  int32_t code = 0;
  
D
dapan1121 已提交
791
  int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
H
Haojun Liao 已提交
792 793 794
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

795
  if (vgNum <= 0) {
D
dapan1121 已提交
796
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
797
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
798 799
  }

800 801
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
802

D
dapan1121 已提交
803
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
804 805

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
806
  tNameExtractFullName(pTableName, tbFullName);
807 808 809

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

D
dapan1121 已提交
810
  void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
811 812 813
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
D
dapan1121 已提交
814
      taosHashCancelIterate(dbInfo->vgHash, pIter);
815
      break;
D
dapan1121 已提交
816
    }
817
    
D
dapan1121 已提交
818
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
819
    vgInfo = NULL;
D
dapan1121 已提交
820 821
  }

822
  if (NULL == vgInfo) {
D
dapan1121 已提交
823 824
    ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
825 826 827 828
  }

  *pVgroup = *vgInfo;

829
  CTG_RET(code);
D
dapan1121 已提交
830 831
}

D
dapan1121 已提交
832
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
D
dapan 已提交
833
  if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
834
    return -1;
D
dapan 已提交
835
  } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
836 837 838 839 840 841 842
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
D
dapan1121 已提交
843
  if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
844
    return -1;
D
dapan1121 已提交
845
  } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
846 847 848
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
849
  }
D
dapan1121 已提交
850 851
}

D
dapan1121 已提交
852
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
D
dapan1121 已提交
853 854 855 856
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

D
dapan1121 已提交
857
  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
D
dapan1121 已提交
858
  
D
dapan1121 已提交
859 860 861
  mgmt->slots = calloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan 已提交
862
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
863
  }
D
dapan1121 已提交
864

D
dapan1121 已提交
865 866 867 868
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
869

D
dapan1121 已提交
870

D
dapan1121 已提交
871
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
D
dapan1121 已提交
872
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
873

D
dapan1121 已提交
874
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
875 876 877 878 879 880 881 882
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
883
    }
D
dapan1121 已提交
884
  }
D
dapan1121 已提交
885

D
dapan1121 已提交
886 887 888
  if (NULL == taosArrayPush(slot->meta, meta)) {
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
889 890
  }

D
dapan1121 已提交
891
  slot->needSort = true;
D
dapan1121 已提交
892

D
dapan1121 已提交
893
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
894

D
dapan1121 已提交
895 896 897 898 899 900
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

D
dapan1121 已提交
901
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
D
dapan1121 已提交
902
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
903

D
dapan1121 已提交
904
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
905 906 907 908
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
909 910
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
911 912 913
  }

  if (slot->needSort) {
D
dapan1121 已提交
914
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
915 916
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
D
dapan1121 已提交
917
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
918 919 920
  }

  void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
D
dapan1121 已提交
921
  if (NULL == orig) {
D
dapan1121 已提交
922
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

D
dapan1121 已提交
942
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
D
dapan1121 已提交
943 944
  int16_t widx = abs(id % mgmt->slotNum);

D
dapan1121 已提交
945
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
  if (idx < 0) {
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

  qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}


D
dapan1121 已提交
978
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
979 980 981 982
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
983
  }
D
dapan1121 已提交
984

D
dapan1121 已提交
985
  SCtgRentSlot *slot = &mgmt->slots[ridx];
D
dapan1121 已提交
986
  int32_t code = 0;
D
dapan1121 已提交
987
  
D
dapan1121 已提交
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = malloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

D
dapan1121 已提交
1024
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
1044 1045 1046
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1047
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
D
dapan1121 已提交
1048
  int32_t code = 0;
D
dapan1121 已提交
1049

D
dapan1121 已提交
1050 1051 1052
  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

D
dapan 已提交
1053
  newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1054
  if (NULL == newDBCache.tbCache.metaCache) {
D
dapan 已提交
1055
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1056 1057 1058
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

D
dapan 已提交
1059
  newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1060
  if (NULL == newDBCache.tbCache.stbCache) {
D
dapan 已提交
1061
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1062 1063 1064 1065
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
D
dapan1121 已提交
1066 1067 1068
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1069
      goto _return;
D
dapan1121 已提交
1070 1071 1072
    }
    
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1073 1074 1075
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
1076
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
D
dapan1121 已提交
1077 1078
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));

D
dapan1121 已提交
1079
  ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1080

D
dapan1121 已提交
1081 1082 1083
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));

  ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
D
dapan1121 已提交
1084

D
dapan1121 已提交
1085
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1086

D
dapan1121 已提交
1087
_return:
D
dapan1121 已提交
1088

D
dapan1121 已提交
1089
  ctgFreeDbCache(&newDBCache);
D
dapan1121 已提交
1090

D
dapan1121 已提交
1091 1092
  CTG_RET(code);
}
D
dapan1121 已提交
1093

D
dapan1121 已提交
1094

D
dapan1121 已提交
1095
void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
D
dapan1121 已提交
1096 1097 1098 1099 1100 1101 1102
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    void *pIter = taosHashIterate(cache->stbCache, NULL);
    while (pIter) {
      uint64_t *suid = NULL;
      taosHashGetKey(pIter, (void **)&suid, NULL);

D
dapan1121 已提交
1103
      if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgSTableVersionCompare)) {
D
dapan1121 已提交
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
        ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
      }
          
      pIter = taosHashIterate(cache->stbCache, pIter);
    }
  }
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
}


D
dapan1121 已提交
1114
int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
D
dapan 已提交
1115 1116 1117 1118
  uint64_t dbId = dbCache->dbId;
  
  ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

D
dapan1121 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
  atomic_store_8(&dbCache->deleted, 1);

  ctgRemoveStbRent(pCtg, &dbCache->tbCache);

  ctgFreeDbCache(dbCache);

  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
  
  ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
D
dapan1121 已提交
1132 1133
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1134
  }
D
dapan 已提交
1135 1136

  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1137 1138 1139
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1140 1141


D
dapan1121 已提交
1142
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
D
dapan1121 已提交
1143 1144
  int32_t code = 0;
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1145
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1146
  
D
dapan1121 已提交
1147 1148
  if (dbCache) {
  // TODO OPEN IT
D
dapan1121 已提交
1149
#if 0    
D
dapan1121 已提交
1150 1151 1152 1153
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1154
#else
D
dapan1121 已提交
1155 1156 1157
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1158 1159
    }

D
dapan1121 已提交
1160 1161 1162 1163 1164
    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1165
    
D
dapan1121 已提交
1166 1167 1168 1169 1170 1171
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDB(pCtg, dbCache, dbFName));
D
dapan1121 已提交
1172
  }
D
dapan1121 已提交
1173 1174
  
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));
D
dapan1121 已提交
1175

D
dapan1121 已提交
1176
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1177

D
dapan1121 已提交
1178
  *pCache = dbCache;
D
dapan1121 已提交
1179

D
dapan1121 已提交
1180
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1181 1182 1183
}


D
dapan1121 已提交
1184 1185 1186 1187 1188 1189
int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) {
  int32_t code = 0;
  SDBVgInfo* dbInfo = *pDbInfo;
  
  if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
D
dapan1121 已提交
1190
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1191 1192
  }

D
dapan1121 已提交
1193
  bool newAdded = false;
D
dapan1121 已提交
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
  SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};

  SCtgDBCache *dbCache = NULL;
  CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
D
dapan1121 已提交
1205
  
D
dapan1121 已提交
1206 1207 1208 1209
  if (dbCache->vgInfo) {
    if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
      ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
      ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1210
      
D
dapan1121 已提交
1211
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1212
    }
D
dapan1121 已提交
1213 1214

    ctgFreeVgInfo(dbCache->vgInfo);
D
dapan1121 已提交
1215 1216
  }

D
dapan1121 已提交
1217
  dbCache->vgInfo = dbInfo;
D
dapan1121 已提交
1218

D
dapan1121 已提交
1219
  *pDbInfo = NULL;
D
dapan1121 已提交
1220

D
dapan1121 已提交
1221
  ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
D
dapan1121 已提交
1222

D
dapan1121 已提交
1223
  ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1224

D
dapan1121 已提交
1225
  dbCache = NULL;
D
dapan1121 已提交
1226

D
dapan1121 已提交
1227 1228 1229
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
  
D
dapan1121 已提交
1230 1231 1232 1233
  CTG_RET(code);
}


D
dapan1121 已提交
1234 1235
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
  SCtgTbMetaCache *tbCache = &dbCache->tbCache;
D
dapan1121 已提交
1236

D
dapan1121 已提交
1237 1238 1239 1240 1241
  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1242 1243
  }

D
dapan1121 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
  int8_t origType = 0;
  uint64_t origSuid = 0;
  bool isStb = meta->tableType == TSDB_SUPER_TABLE;
  STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (orig) {
    origType = orig->tableType;
    origSuid = orig->suid;
    
    if (origType == TSDB_SUPER_TABLE && ((!isStb) || origSuid != meta->suid)) {
      CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
      if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      }
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1258

D
dapan1121 已提交
1259 1260 1261 1262
      ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      
      ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgSTableVersionCompare);
    }
D
dapan1121 已提交
1263
  }
D
dapan1121 已提交
1264

D
dapan1121 已提交
1265 1266
  if (isStb) {
    CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1267
  }
D
dapan1121 已提交
1268
  
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277
  if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
    if (isStb) {
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    }
    
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1278

D
dapan 已提交
1279 1280
  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);

D
dapan1121 已提交
1281 1282 1283
  if (!isStb) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1284
  }
D
dapan1121 已提交
1285

D
dapan1121 已提交
1286 1287 1288 1289 1290
  if (isStb && origSuid == meta->suid) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1291

D
dapan1121 已提交
1292 1293 1294 1295 1296 1297
  STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1298
  }
D
dapan1121 已提交
1299 1300
  
  CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1301

D
dapan1121 已提交
1302 1303
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);

D
dapan 已提交
1304
  ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1305 1306 1307 1308 1309 1310 1311

  SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
  strcpy(metaRent.dbFName, dbFName);
  strcpy(metaRent.stbName, tbName);
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1312 1313
}

D
dapan 已提交
1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
  *dst = malloc(sizeof(SDBVgInfo));
  if (NULL == *dst) {
    qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  memcpy(*dst, src, sizeof(SDBVgInfo));

  size_t hashSize = taosHashGetSize(src->vgHash);
  (*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == (*dst)->vgHash) {
    qError("taosHashInit %d failed", (int32_t)hashSize);
    tfree(*dst);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  int32_t *vgId = NULL;
  void *pIter = taosHashIterate(src->vgHash, NULL);
  while (pIter) {
    taosHashGetKey(pIter, (void **)&vgId, NULL);

    if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
      qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
      taosHashCancelIterate(src->vgHash, pIter);
      taosHashCleanup((*dst)->vgHash);
      tfree(*dst);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(src->vgHash, pIter);
  }


  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
1353
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
D
dapan1121 已提交
1354
  bool inCache = false;
D
dapan1121 已提交
1355
  int32_t code = 0;
1356
  if (!forceUpdate) {
D
dapan1121 已提交
1357
    CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
D
dapan1121 已提交
1358
    if (inCache) {
D
dapan1121 已提交
1359 1360 1361 1362 1363 1364 1365
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

D
dapan1121 已提交
1366
  tstrncpy(input.db, dbFName, tListLen(input.db));
D
dapan1121 已提交
1367
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
1368

D
dapan 已提交
1369
  CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
1370

D
dapan 已提交
1371 1372 1373 1374 1375 1376 1377 1378
  CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
  
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
  SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
    ctgFreeVgInfo(DbOut.dbVgroup);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1379 1380
  }

D
dapan 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->pCtg = pCtg;
  msg->dbId = DbOut.dbId;
  msg->dbInfo = DbOut.dbVgroup;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
1392
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
1393 1394 1395 1396 1397 1398 1399 1400 1401

_return:

  tfree(*pInfo);
  tfree(msg);
  
  *pInfo = DbOut.dbVgroup;
  
  CTG_RET(code);
D
dapan1121 已提交
1402 1403
}

D
dapan 已提交
1404

D
dapan1121 已提交
1405 1406 1407 1408 1409
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
  *pOutput = malloc(sizeof(STableMetaOutput));
  if (NULL == *pOutput) {
    qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
1410 1411
  }

D
dapan1121 已提交
1412 1413 1414 1415 1416 1417
  memcpy(*pOutput, output, sizeof(STableMetaOutput));

  if (output->tbMeta) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    (*pOutput)->tbMeta = malloc(metaSize);
    if (NULL == (*pOutput)->tbMeta) {
D
dapan 已提交
1418
      qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
D
dapan1121 已提交
1419 1420 1421 1422 1423
      tfree(*pOutput);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
D
dapan 已提交
1424 1425
  }

D
dapan1121 已提交
1426 1427 1428
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
1429 1430


D
dapan1121 已提交
1431 1432
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable, STableMetaOutput **pOutput) {
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1433 1434 1435 1436 1437 1438
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

D
dapan1121 已提交
1439
  CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
D
dapan1121 已提交
1440

D
dapan 已提交
1441
  SCtgUpdateTblMsg *msg = NULL;
D
dapan1121 已提交
1442 1443 1444 1445 1446 1447 1448
  STableMetaOutput  moutput = {0};
  STableMetaOutput *output = malloc(sizeof(STableMetaOutput));
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
1449
  if (CTG_IS_STABLE(isSTable)) {
D
dapan 已提交
1450
    ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1451 1452

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
1453
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTransporter, pMgmtEps, pTableName, output));
D
dapan1121 已提交
1454

D
dapan1121 已提交
1455 1456
    if (CTG_IS_META_NULL(output->metaType)) {
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1457 1458
    }
  } else {
D
dapan 已提交
1459
    ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
D
dapan1121 已提交
1460 1461

    // if get from vnode failed or no table meta, will not try mnode
D
dapan1121 已提交
1462
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1463

D
dapan1121 已提交
1464
    if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan 已提交
1465
      ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType);
D
dapan1121 已提交
1466

D
dapan1121 已提交
1467
      tfree(output->tbMeta);
D
dapan1121 已提交
1468
      
D
dapan1121 已提交
1469 1470
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, output));
    } else if (CTG_IS_META_BOTH(output->metaType)) {
D
dapan1121 已提交
1471
      int32_t exist = 0;
D
dapan1121 已提交
1472
      CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
D
dapan1121 已提交
1473
      if (0 == exist) {
D
dapan1121 已提交
1474
        CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, &moutput));
D
dapan1121 已提交
1475

D
dapan1121 已提交
1476
        if (CTG_IS_META_NULL(moutput.metaType)) {
D
dapan1121 已提交
1477
          SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
1478 1479
        }
        
D
dapan1121 已提交
1480 1481
        tfree(output->tbMeta);
        output->tbMeta = moutput.tbMeta;
D
dapan1121 已提交
1482 1483
        moutput.tbMeta = NULL;
      } else {
D
dapan1121 已提交
1484
        tfree(output->tbMeta);
D
dapan1121 已提交
1485
        
D
dapan1121 已提交
1486
        SET_META_TYPE_CTABLE(output->metaType); 
D
dapan1121 已提交
1487
      }
D
dapan1121 已提交
1488 1489 1490
    }
  }

D
dapan1121 已提交
1491
  if (CTG_IS_META_NULL(output->metaType)) {
D
dapan 已提交
1492
    ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1493 1494 1495
    CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
  }

D
dapan 已提交
1496 1497 1498 1499 1500 1501
  if (CTG_IS_META_TABLE(output->metaType)) {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
  } else {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
  }

D
dapan1121 已提交
1502 1503 1504 1505 1506
  if (pOutput) {
    CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
  }

  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
D
dapan 已提交
1507
  msg = malloc(sizeof(SCtgUpdateTblMsg));
D
dapan1121 已提交
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
1520 1521
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
1522
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1523 1524 1525

_return:

D
dapan1121 已提交
1526
  tfree(output->tbMeta);
D
dapan 已提交
1527
  tfree(output);
D
dapan1121 已提交
1528
  tfree(msg);
D
dapan1121 已提交
1529 1530 1531 1532
  
  CTG_RET(code);
}

D
dapan1121 已提交
1533 1534
int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
1535 1536 1537 1538
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
  int32_t exist = 0;
D
dapan1121 已提交
1539
  int32_t code = 0;
D
dapan1121 已提交
1540 1541

  if (!forceUpdate) {
D
dapan1121 已提交
1542
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist));
D
dapan1121 已提交
1543 1544 1545 1546

    if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1547 1548

    tfree(*pTableMeta);
D
dapan1121 已提交
1549 1550 1551
  } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
    int32_t tbType = 0;
    
D
dapan1121 已提交
1552
    CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType));
D
dapan1121 已提交
1553 1554 1555 1556

    CTG_SET_STABLE(isSTable, tbType);
  }

D
dapan1121 已提交
1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571 1572 1573 1574 1575 1576 1577
  STableMetaOutput *output = NULL;

  CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, isSTable, &output));

  if (CTG_IS_META_TABLE(output->metaType)) {
    *pTableMeta = output->tbMeta;
    goto _return;
  }

  if (CTG_IS_META_BOTH(output->metaType)) {
    memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
    
    *pTableMeta = output->tbMeta;
    goto _return;
  }

  if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) {
    ctgError("invalid metaType:%d", output->metaType);
    tfree(output->tbMeta);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
1578

D
dapan1121 已提交
1579
  // HANDLE ONLY CHILD TABLE META
D
dapan1121 已提交
1580

D
dapan1121 已提交
1581 1582 1583 1584
  SName stbName = *pTableName;
  strcpy(stbName.tname, output->tbName);
  
  CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist));
D
dapan1121 已提交
1585
  if (0 == exist) {
D
dapan1121 已提交
1586 1587 1588 1589 1590 1591 1592 1593 1594 1595
    ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
    CTG_ERR_JRET(TSDB_CODE_VND_TB_NOT_EXIST);
  }

  memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));

_return:

  tfree(output);

D
dapan 已提交
1596 1597 1598 1599
  if (*pTableMeta) {
    ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType);
  }

D
dapan1121 已提交
1600 1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624
  CTG_RET(code);
}



int32_t ctgActUpdateVg(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateVgMsg *msg = action->data;
  
  CTG_ERR_JRET(ctgUpdateDBVgInfo(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));

_return:

  tfree(msg->dbInfo);
  tfree(msg);
  
  CTG_RET(code);
}

int32_t ctgActRemoveDB(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveDBMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
D
dapan 已提交
1625
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
D
dapan1121 已提交
1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653
  if (NULL == dbCache) {
    goto _return;
  }
  
  if (dbCache->dbId != msg->dbId) {
    ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
    goto _return;
  }
  
  CTG_ERR_JRET(ctgRemoveDB(pCtg, dbCache, msg->dbFName));

_return:

  tfree(msg);
  
  CTG_RET(code);
}


int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateTblMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;
  STableMetaOutput* output = msg->output;
  SCtgDBCache *dbCache = NULL;

  if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
D
dapan 已提交
1654
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
  }

  if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }    

  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
  }

  if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
  }

_return:

D
dapan 已提交
1680 1681 1682
  if (output) {
    tfree(output->tbMeta);
    tfree(output);
D
dapan1121 已提交
1683
  }
D
dapan 已提交
1684
  
D
dapan1121 已提交
1685 1686 1687 1688 1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719
  tfree(msg);
  
  CTG_RET(code);
}


int32_t ctgActRemoveStb(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveStbMsg *msg = action->data;
  bool removed = false;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->dbId != msg->dbId) {
    ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {  
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1720
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749
  }  
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
  CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  
  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgSTableVersionCompare));
  
  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
  
_return:

  tfree(msg);
  
  CTG_RET(code);
}

int32_t ctgActRemoveTbl(SCtgMetaAction *action) {

}



void* ctgUpdateThreadFunc(void* param) {
  setThreadName("catalog");

  qInfo("catalog update thread started");

D
dapan 已提交
1750
  CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
1751 1752
  
  while (true) {
D
dapan 已提交
1753
    tsem_wait(&gCtgMgmt.sem);
D
dapan1121 已提交
1754
    
D
dapan 已提交
1755
    if (atomic_load_8(&gCtgMgmt.exit)) {
D
dapan1121 已提交
1756 1757 1758 1759 1760 1761
      break;
    }

    SCtgMetaAction *action = NULL;
    ctgPopAction(&action);

D
dapan 已提交
1762 1763 1764 1765 1766
    qDebug("process %s action", gCtgAction[action->act].name);
    
    (*gCtgAction[action->act].func)(action);

    CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); 
D
dapan1121 已提交
1767 1768
  }

D
dapan 已提交
1769
  CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780 1781

  qInfo("catalog update thread stopped");
  
  return NULL;
}


int32_t ctgStartUpdateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

D
dapan 已提交
1782
  if (pthread_create(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
D
dapan1121 已提交
1783 1784
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
D
dapan1121 已提交
1785 1786
  }
  
D
dapan1121 已提交
1787
  pthread_attr_destroy(&thAttr);
D
dapan1121 已提交
1788 1789 1790 1791
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1792
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan 已提交
1793
  if (gCtgMgmt.pCluster) {
D
dapan 已提交
1794
    qError("catalog already initialized");
D
dapan1121 已提交
1795
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1796 1797
  }

D
dapan 已提交
1798
  atomic_store_8(&gCtgMgmt.exit, false);
D
dapan1121 已提交
1799

D
dapan1121 已提交
1800
  if (cfg) {
D
dapan 已提交
1801
    memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
1802

D
dapan 已提交
1803 1804
    if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
      gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
D
dapan1121 已提交
1805 1806
    }

D
dapan 已提交
1807 1808
    if (gCtgMgmt.cfg.maxTblCacheNum == 0) {
      gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
D
dapan1121 已提交
1809
    }
D
dapan1121 已提交
1810

D
dapan 已提交
1811 1812
    if (gCtgMgmt.cfg.dbRentSec == 0) {
      gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1813 1814
    }

D
dapan 已提交
1815 1816
    if (gCtgMgmt.cfg.stbRentSec == 0) {
      gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1817
    }
D
dapan1121 已提交
1818
  } else {
D
dapan 已提交
1819 1820 1821 1822
    gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
    gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
1823 1824
  }

D
dapan 已提交
1825 1826
  gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == gCtgMgmt.pCluster) {
D
dapan1121 已提交
1827 1828
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1829 1830
  }

D
dapan1121 已提交
1831 1832
  CTG_ERR_RET(ctgStartUpdateThread());

D
dapan 已提交
1833
  tsem_init(&gCtgMgmt.sem, 0, 0);
D
dapan1121 已提交
1834

D
dapan 已提交
1835 1836
  gCtgMgmt.head = calloc(1, sizeof(SCtgQNode));
  if (NULL == gCtgMgmt.head) {
D
dapan1121 已提交
1837 1838 1839
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
1840
  gCtgMgmt.tail = gCtgMgmt.head;
D
dapan1121 已提交
1841

D
dapan 已提交
1842
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
D
dapan1121 已提交
1843

D
dapan 已提交
1844
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1845 1846
}

D
dapan1121 已提交
1847
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
1848
  if (NULL == catalogHandle) {
D
dapan1121 已提交
1849
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1850 1851
  }

D
dapan 已提交
1852
  if (NULL == gCtgMgmt.pCluster) {
D
dapan 已提交
1853
    qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
1854
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
1855 1856
  }

D
dapan1121 已提交
1857 1858
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
1859

D
dapan1121 已提交
1860
  while (true) {
D
dapan 已提交
1861
    SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
1862

D
dapan1121 已提交
1863 1864 1865 1866 1867
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
1868

D
dapan1121 已提交
1869 1870 1871 1872 1873 1874
    clusterCtg = calloc(1, sizeof(SCatalog));
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan1121 已提交
1875 1876
    clusterCtg->clusterId = clusterId;

D
dapan 已提交
1877 1878
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
D
dapan1121 已提交
1879

D
dapan 已提交
1880
    clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1881 1882 1883 1884 1885
    if (NULL == clusterCtg->dbCache) {
      qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan 已提交
1886
    SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1887
    if (NULL == metaCache) {
D
dapan 已提交
1888
      qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1889 1890 1891
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
D
dapan 已提交
1892
    code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
D
dapan1121 已提交
1893 1894 1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
1906
  }
D
dapan1121 已提交
1907 1908

  *catalogHandle = clusterCtg;
D
dapan 已提交
1909
  
D
dapan1121 已提交
1910
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1911 1912 1913 1914 1915 1916 1917 1918

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

D
dapan1121 已提交
1919 1920
void catalogFreeHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
1921 1922
    return;
  }
D
dapan1121 已提交
1923

D
dapan 已提交
1924
  if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
D
dapan1121 已提交
1925
    ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
D
dapan1121 已提交
1926 1927 1928
    return;
  }

D
dapan1121 已提交
1929
  uint64_t clusterId = pCtg->clusterId;
D
dapan1121 已提交
1930
  
D
dapan1121 已提交
1931
  ctgFreeHandle(pCtg);
D
dapan1121 已提交
1932 1933

  ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
D
dapan 已提交
1934 1935
}

D
dapan1121 已提交
1936
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) {
D
dapan1121 已提交
1937 1938
  CTG_API_ENTER();

D
dapan1121 已提交
1939 1940 1941 1942 1943
  if (NULL == pCtg || NULL == dbFName || NULL == version) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
1944
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1945
    ctgInfo("empty db cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1946
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1947 1948
  }

D
dapan1121 已提交
1949 1950 1951
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1952
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1953
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1954 1955
  }

D
dapan 已提交
1956 1957 1958
  bool inCache = false;
  ctgAcquireVgInfo(pCtg, dbCache, &inCache);
  if (!inCache) {
D
dapan1121 已提交
1959
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
1960 1961

    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1962
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1963 1964
  }

D
dapan1121 已提交
1965 1966 1967 1968
  *version = dbCache->vgInfo->vgVersion;

  ctgReleaseVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
1969

D
dapan1121 已提交
1970
  ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
D
dapan1121 已提交
1971

D
dapan1121 已提交
1972
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1973 1974
}

D
dapan1121 已提交
1975
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
D
dapan1121 已提交
1976 1977
  CTG_API_ENTER();

D
dapan1121 已提交
1978 1979 1980
  if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
1981

D
dapan1121 已提交
1982
  SCtgDBCache* dbCache = NULL;
1983
  int32_t code = 0;
D
dapan1121 已提交
1984
  SArray *vgList = NULL;
D
dapan1121 已提交
1985 1986 1987 1988 1989 1990 1991
  SHashObj *vgHash = NULL;
  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo));
  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
D
dapan1121 已提交
1992 1993
  }

D
dapan1121 已提交
1994
  CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
D
dapan1121 已提交
1995 1996 1997 1998 1999

  *vgroupList = vgList;
  vgList = NULL;

_return:
D
dapan1121 已提交
2000 2001

  if (dbCache) {
D
dapan1121 已提交
2002 2003
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
2004 2005
  }

D
dapan1121 已提交
2006 2007 2008
  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2009 2010
  }

D
dapan1121 已提交
2011
  CTG_API_LEAVE(code);  
D
dapan1121 已提交
2012 2013 2014
}


D
dapan1121 已提交
2015
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
D
dapan1121 已提交
2016
  CTG_API_ENTER();
D
dapan1121 已提交
2017 2018

  int32_t code = 0;
D
dapan1121 已提交
2019
  
D
dapan1121 已提交
2020
  if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
D
dapan1121 已提交
2021 2022 2023
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2024 2025 2026 2027
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
  SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
2028
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2029 2030
  }

D
dapan1121 已提交
2031 2032 2033 2034
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;
D
dapan1121 已提交
2035
  dbInfo = NULL;
D
dapan1121 已提交
2036

D
dapan1121 已提交
2037
  action.data = msg;
D
dapan1121 已提交
2038

D
dapan1121 已提交
2039
  CTG_ERR_JRET(ctgPushAction(&action));
D
dapan1121 已提交
2040

D
dapan 已提交
2041 2042
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2043 2044
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2045 2046
_return:

D
dapan1121 已提交
2047 2048 2049
  if (dbInfo) {
    taosHashCleanup(dbInfo->vgHash);
    tfree(dbInfo);
D
dapan1121 已提交
2050
  }
D
dapan1121 已提交
2051 2052

  tfree(msg);
D
dapan1121 已提交
2053
  
D
dapan1121 已提交
2054
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2055 2056 2057
}


D
dapan1121 已提交
2058 2059 2060
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
  CTG_API_ENTER();

D
dapan1121 已提交
2061 2062
  int32_t code = 0;
  
D
dapan1121 已提交
2063 2064
  if (NULL == pCtg || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2065 2066
  }

D
dapan1121 已提交
2067
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2068
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2069
  }
D
dapan1121 已提交
2070

D
dapan1121 已提交
2071 2072 2073 2074 2075
  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
  SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2076
  }
D
dapan1121 已提交
2077

D
dapan1121 已提交
2078 2079 2080
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;
D
dapan1121 已提交
2081

D
dapan1121 已提交
2082 2083 2084 2085
  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2086 2087
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2088
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2089
  
D
dapan1121 已提交
2090 2091 2092
_return:

  tfree(action.data);
D
dapan1121 已提交
2093

D
dapan1121 已提交
2094
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2095 2096
}

D
dapan1121 已提交
2097 2098 2099
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
  CTG_API_ENTER();

D
dapan 已提交
2100 2101
  int32_t code = 0;
  
D
dapan1121 已提交
2102 2103
  if (NULL == pCtg || NULL == dbFName || NULL == stbName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
2104 2105
  }

D
dapan1121 已提交
2106
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2107
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2108
  }
D
dapan1121 已提交
2109 2110 2111 2112 2113 2114

  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB};
  SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
2115 2116
  }

D
dapan1121 已提交
2117 2118 2119 2120 2121
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  strncpy(msg->stbName, stbName, sizeof(msg->stbName));
  msg->dbId = dbId;
  msg->suid = suid;
D
dapan1121 已提交
2122

D
dapan1121 已提交
2123 2124 2125 2126
  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2127 2128
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2129
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2130
  
D
dapan1121 已提交
2131 2132 2133 2134
_return:

  tfree(action.data);

D
dapan1121 已提交
2135
  CTG_API_LEAVE(code);
D
dapan 已提交
2136 2137
}

D
dapan1121 已提交
2138

D
dapan1121 已提交
2139
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2140 2141
  CTG_API_ENTER();

D
dapan1121 已提交
2142
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1));
D
dapan1121 已提交
2143
}
D
dapan1121 已提交
2144

D
dapan1121 已提交
2145
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2146 2147
  CTG_API_ENTER();

D
dapan1121 已提交
2148
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1));
D
dapan1121 已提交
2149 2150
}

D
dapan1121 已提交
2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163
int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == rspMsg) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
2164 2165
  int32_t code = 0;

D
dapan1121 已提交
2166 2167
  strcpy(output->dbFName, rspMsg->dbFName);
  strcpy(output->tbName, rspMsg->tbName);
D
dapan1121 已提交
2168

D
dapan1121 已提交
2169
  output->dbId = rspMsg->dbId;
D
dapan1121 已提交
2170
  
D
dapan1121 已提交
2171
  SET_META_TYPE_TABLE(output->metaType);
D
dapan1121 已提交
2172
  
D
dapan1121 已提交
2173
  CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
D
dapan1121 已提交
2174

D
dapan1121 已提交
2175 2176 2177 2178 2179 2180
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
  SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
2181

D
dapan1121 已提交
2182 2183 2184 2185 2186 2187 2188
  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2189 2190
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2191 2192
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2193 2194
_return:

D
dapan1121 已提交
2195 2196 2197
  tfree(output->tbMeta);
  tfree(output);
  tfree(msg);
D
dapan1121 已提交
2198
  
D
dapan1121 已提交
2199
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2200 2201 2202
}


D
dapan1121 已提交
2203
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
D
dapan1121 已提交
2204 2205
  CTG_API_ENTER();

D
dapan1121 已提交
2206 2207 2208 2209 2210
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTransporter, pMgmtEps, pTableName, isSTable, NULL));
2211
}
2212

D
dapan1121 已提交
2213
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
2214 2215
  CTG_API_ENTER();

D
dapan1121 已提交
2216
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable));
D
dapan1121 已提交
2217 2218
}

D
dapan1121 已提交
2219
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
D
dapan1121 已提交
2220
  CTG_API_ENTER();
D
dapan1121 已提交
2221 2222 2223 2224

  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2225 2226 2227 2228
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
2229
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
2230
  SArray *vgList = NULL;
D
dapan 已提交
2231
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
2232

D
dapan1121 已提交
2233
  *pVgList = NULL;
D
dapan1121 已提交
2234
  
D
dapan1121 已提交
2235
  CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
D
dapan1121 已提交
2236

H
Haojun Liao 已提交
2237 2238
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan 已提交
2239

D
dapan1121 已提交
2240 2241 2242 2243 2244 2245 2246 2247 2248 2249
  SHashObj *vgHash = NULL;  
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));

  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
  }

  /* TODO REMOEV THIS ....
D
dapan 已提交
2250 2251 2252
  if (0 == tbMeta->vgId) {
    SVgroupInfo vgroup = {0};
    
D
dapan1121 已提交
2253
    catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup);
D
dapan 已提交
2254 2255 2256

    tbMeta->vgId = vgroup.vgId;
  }
D
dapan1121 已提交
2257
  // TODO REMOVE THIS ....*/
D
dapan 已提交
2258

2259
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
2260
    CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
D
dapan1121 已提交
2261
  } else {
2262
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
2263
    if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
2264
      ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan 已提交
2265
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
2266
    }
D
dapan1121 已提交
2267

D
dapan1121 已提交
2268 2269
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan1121 已提交
2270
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
D
dapan 已提交
2271 2272 2273
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
2274
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
2275
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan1121 已提交
2276 2277
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
2278

D
dapan1121 已提交
2279
    *pVgList = vgList;
D
dapan1121 已提交
2280 2281
    vgList = NULL;
  }
D
dapan 已提交
2282

D
dapan1121 已提交
2283
_return:
D
dapan 已提交
2284

D
dapan1121 已提交
2285
  if (dbCache) {
D
dapan1121 已提交
2286 2287 2288 2289 2290 2291 2292 2293 2294
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  tfree(tbMeta);

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2295 2296 2297 2298 2299 2300
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
2301
  
D
dapan1121 已提交
2302
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2303 2304 2305
}


D
dapan1121 已提交
2306
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
2307 2308
  CTG_API_ENTER();

D
dapan1121 已提交
2309 2310
  SCtgDBCache* dbCache = NULL;
  int32_t code = 0;
H
Haojun Liao 已提交
2311 2312
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
2313

D
dapan1121 已提交
2314 2315
  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTransporter, pMgmtEps, db, false, &dbCache, &vgInfo));
D
dapan1121 已提交
2316

D
dapan1121 已提交
2317
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
D
dapan1121 已提交
2318

D
dapan1121 已提交
2319
_return:
D
dapan1121 已提交
2320

D
dapan1121 已提交
2321
  if (dbCache) {
D
dapan1121 已提交
2322 2323 2324 2325 2326 2327 2328
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2329
  }
D
dapan1121 已提交
2330

D
dapan1121 已提交
2331
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2332 2333 2334
}


D
dapan1121 已提交
2335
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
D
dapan1121 已提交
2336 2337
  CTG_API_ENTER();

D
dapan1121 已提交
2338 2339 2340 2341
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2342
  int32_t code = 0;
D
dapan1121 已提交
2343
  pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2344 2345 2346

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
2347
    if (tbNum <= 0) {
D
dapan1121 已提交
2348
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
2349
      CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2350
    }
H
Haojun Liao 已提交
2351

D
dapan1121 已提交
2352 2353
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
2354
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
2355
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2356 2357 2358 2359 2360 2361
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
D
dapan1121 已提交
2362
      CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, name, false, &pTableMeta, -1));
D
dapan1121 已提交
2363 2364 2365 2366 2367 2368 2369 2370 2371

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

D
dapan1121 已提交
2372
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2373 2374

_return:  
D
dapan1121 已提交
2375

D
dapan1121 已提交
2376 2377 2378 2379 2380 2381 2382 2383
  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
2384
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2385
  }
D
dapan 已提交
2386
  
D
dapan1121 已提交
2387
  CTG_API_LEAVE(code);
2388
}
D
dapan 已提交
2389

D
dapan1121 已提交
2390
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
D
dapan1121 已提交
2391 2392
  CTG_API_ENTER();

D
dapan1121 已提交
2393 2394 2395 2396
  if (NULL == pCtg || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2397
  //TODO
D
dapan 已提交
2398

D
dapan1121 已提交
2399
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2400 2401
}

D
dapan1121 已提交
2402
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) {
D
dapan1121 已提交
2403 2404
  CTG_API_ENTER();

D
dapan1121 已提交
2405 2406
  if (NULL == pCtg || NULL == stables || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2407 2408
  }

D
dapan1121 已提交
2409 2410 2411 2412
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) {
D
dapan1121 已提交
2413
  CTG_API_ENTER();
D
dapan1121 已提交
2414 2415 2416 2417
  
  if (NULL == pCtg || NULL == dbs || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2418

D
dapan1121 已提交
2419
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
D
dapan1121 已提交
2420 2421
}

D
dapan 已提交
2422

D
dapan 已提交
2423
void catalogDestroy(void) {
D
dapan1121 已提交
2424 2425
  qInfo("start to destroy catalog");
  
D
dapan 已提交
2426
  if (NULL == gCtgMgmt.pCluster || atomic_load_8(&gCtgMgmt.exit)) {
D
dapan1121 已提交
2427 2428 2429
    return;
  }

D
dapan 已提交
2430 2431 2432
  atomic_store_8(&gCtgMgmt.exit, true);

  tsem_post(&gCtgMgmt.sem);
D
dapan1121 已提交
2433

D
dapan 已提交
2434
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2435

D
dapan1121 已提交
2436
  SCatalog *pCtg = NULL;
D
dapan 已提交
2437
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
2438
  while (pIter) {
D
dapan1121 已提交
2439
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
2440

D
dapan1121 已提交
2441 2442
    if (pCtg) {
      catalogFreeHandle(pCtg);
D
dapan1121 已提交
2443 2444
    }
    
D
dapan 已提交
2445
    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
D
dapan 已提交
2446
  }
D
dapan1121 已提交
2447
  
D
dapan 已提交
2448 2449
  taosHashCleanup(gCtgMgmt.pCluster);
  gCtgMgmt.pCluster = NULL;
D
dapan1121 已提交
2450

D
dapan 已提交
2451
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2452

D
dapan1121 已提交
2453
  qInfo("catalog destroyed");
D
dapan 已提交
2454 2455 2456 2457
}