catalog.c 68.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan 已提交
21 22 23 24 25 26
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTbl(SCtgMetaAction *action);

D
dapan 已提交
27
SCatalogMgmt gCtgMgmt = {0};
D
dapan1121 已提交
28
SCtgDebug gCTGDebug = {0};
D
dapan 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
SCtgAction gCtgAction[CTG_ACT_MAX] = {{
                            CTG_ACT_UPDATE_VG,
                            "update vgInfo",
                            ctgActUpdateVg
                          },
                          {
                            CTG_ACT_UPDATE_TBL,
                            "update tbMeta",
                            ctgActUpdateTbl
                          },
                          {
                            CTG_ACT_REMOVE_DB,
                            "remove DB",
                            ctgActRemoveDB
                          },
                          {
                            CTG_ACT_REMOVE_STB,
                            "remove stbMeta",
                            ctgActRemoveStb
                          },
                          {
                            CTG_ACT_REMOVE_TBL,
                            "remove tbMeta",
                            ctgActRemoveTbl
                          }
};

int32_t ctgDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gCTGDebug.lockDebug = true;
    qDebug("lock debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "cache")) {
    gCTGDebug.cacheDebug = true;
    qDebug("cache debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "api")) {
    gCTGDebug.apiDebug = true;
    qDebug("api debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid debug option:%s", option);
  
  return TSDB_CODE_CTG_INTERNAL_ERROR;
}

int32_t ctgDbgGetStatNum(char *option, void *res) {
  if (0 == strcasecmp(option, "runtime.qDoneNum")) {
    *(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
    return TSDB_CODE_SUCCESS;
  }
85

D
dapan 已提交
86 87 88 89
  qError("invalid stat option:%s", option);
  
  return TSDB_CODE_CTG_INTERNAL_ERROR;
}
D
dapan1121 已提交
90

D
dapan1121 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) {
  return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
}

int32_t ctgDbgGetStbNum(SCtgDBCache *dbCache) {
  return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
}

int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) {
  int32_t num = 0;
  for (uint16_t i = 0; i < rent->slotNum; ++i) {
    SCtgRentSlot *slot = &rent->slots[i];
    if (NULL == slot->meta) {
      continue;
    }

    num += taosArrayGetSize(slot->meta);
  }

  return num;
}

D
dapan1121 已提交
113 114
int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
  if (NULL == pCtg || NULL == pCtg->dbCache) {
D
dapan1121 已提交
115 116 117 118 119
    return 0;
  }

  switch (type) {
    case CTG_DBG_DB_NUM:
D
dapan1121 已提交
120
      return (int32_t)taosHashGetSize(pCtg->dbCache);
D
dapan1121 已提交
121
    case CTG_DBG_DB_RENT_NUM:
D
dapan1121 已提交
122
      return ctgDbgGetRentNum(&pCtg->dbRent);
D
dapan1121 已提交
123
    case CTG_DBG_STB_RENT_NUM:
D
dapan1121 已提交
124
      return ctgDbgGetRentNum(&pCtg->stbRent);
D
dapan1121 已提交
125 126 127 128 129 130
    default:
      break;
  }

  SCtgDBCache *dbCache = NULL;
  int32_t num = 0;
D
dapan1121 已提交
131
  void *pIter = taosHashIterate(pCtg->dbCache, NULL);
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144
  while (pIter) {
    dbCache = (SCtgDBCache *)pIter;
    switch (type) {
      case CTG_DBG_META_NUM:
        num += ctgDbgGetTbMetaNum(dbCache);
        break;
      case CTG_DBG_STB_NUM:
        num += ctgDbgGetStbNum(dbCache);
        break;
      default:
        ctgError("invalid type:%d", type);
        break;
    }
D
dapan1121 已提交
145
    pIter = taosHashIterate(pCtg->dbCache, pIter);
D
dapan1121 已提交
146 147 148 149 150 151 152
  }

  return num;
}


void ctgDbgShowDBCache(SHashObj *dbHash) {
D
dapan1121 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165
  if (NULL == dbHash) {
    return;
  }

  int32_t i = 0;
  SCtgDBCache *dbCache = NULL;
  void *pIter = taosHashIterate(dbHash, NULL);
  while (pIter) {
    char *dbFName = NULL;
    size_t len = 0;
    
    dbCache = (SCtgDBCache *)pIter;

D
dapan1121 已提交
166
    taosHashGetKey(dbCache, (void **)&dbFName, &len);
D
dapan1121 已提交
167
    
D
dapan1121 已提交
168
    CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
D
dapan1121 已提交
169 170 171 172 173
    
    pIter = taosHashIterate(dbHash, pIter);
  }
}

D
dapan1121 已提交
174 175 176



D
dapan1121 已提交
177 178
void ctgDbgShowClusterCache(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
179 180 181
    return;
  }

D
dapan1121 已提交
182 183 184
  CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg);
  CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 
    ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));    
D
dapan1121 已提交
185
  
D
dapan1121 已提交
186
  ctgDbgShowDBCache(pCtg->dbCache);
D
dapan1121 已提交
187 188
}

D
dapan 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219


void ctgPopAction(SCtgMetaAction **action) {
  SCtgQNode *orig = gCtgMgmt.head;
  
  SCtgQNode *node = gCtgMgmt.head->next;
  gCtgMgmt.head = gCtgMgmt.head->next;

  CTG_QUEUE_SUB();
  
  tfree(orig);

  *action = &node->action;
}


int32_t ctgPushAction(SCtgMetaAction *action) {
  SCtgQNode *node = calloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
  node->action = *action;

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock);
  gCtgMgmt.tail->next = node;
  gCtgMgmt.tail = node;
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock);

  CTG_QUEUE_ADD();
D
dapan1121 已提交
220
  CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum);
D
dapan 已提交
221 222 223 224 225 226 227

  tsem_post(&gCtgMgmt.sem);

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
  if (NULL == mgmt->slots) {
    return;
  }

  for (int32_t i = 0; i < mgmt->slotNum; ++i) {
    SCtgRentSlot *slot = &mgmt->slots[i];
    if (slot->meta) {
      taosArrayDestroy(slot->meta);
      slot->meta = NULL;
    }
  }

  tfree(mgmt->slots);
}


D
dapan1121 已提交
245 246 247 248 249
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    taosHashCleanup(cache->stbCache);
    cache->stbCache = NULL;
D
dapan1121 已提交
250
  }
D
dapan1121 已提交
251
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
D
dapan1121 已提交
252

D
dapan1121 已提交
253 254 255 256
  CTG_LOCK(CTG_WRITE, &cache->metaLock);
  if (cache->metaCache) {
    taosHashCleanup(cache->metaCache);
    cache->metaCache = NULL;
D
dapan1121 已提交
257
  }
D
dapan1121 已提交
258
  CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
D
dapan1121 已提交
259 260
}

D
dapan1121 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
  if (NULL == vgInfo) {
    return;
  }

  if (vgInfo->vgHash) {
    taosHashCleanup(vgInfo->vgHash);
    vgInfo->vgHash = NULL;
  }
  
  tfree(vgInfo);
}

D
dapan1121 已提交
274 275 276 277 278
void ctgFreeDbCache(SCtgDBCache *dbCache) {
  if (NULL == dbCache) {
    return;
  }

D
dapan1121 已提交
279
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
280
  ctgFreeVgInfo (dbCache->vgInfo);
D
dapan1121 已提交
281
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
282 283 284 285 286

  ctgFreeTableMetaCache(&dbCache->tbCache);
}


D
dapan1121 已提交
287 288 289 290 291 292
void ctgFreeHandle(SCatalog* pCtg) {
  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);
  
  if (pCtg->dbCache) {
    void *pIter = taosHashIterate(pCtg->dbCache, NULL);
D
dapan1121 已提交
293 294 295
    while (pIter) {
      SCtgDBCache *dbCache = pIter;

D
dapan1121 已提交
296 297
      atomic_store_8(&dbCache->deleted, 1);

D
dapan1121 已提交
298
      ctgFreeDbCache(dbCache);
D
dapan1121 已提交
299 300
            
      pIter = taosHashIterate(pCtg->dbCache, pIter);
D
dapan1121 已提交
301 302
    }  

D
dapan1121 已提交
303
    taosHashCleanup(pCtg->dbCache);
D
dapan1121 已提交
304 305
  }
  
D
dapan1121 已提交
306
  free(pCtg);
D
dapan1121 已提交
307 308
}

D
dapan1121 已提交
309

D
dapan 已提交
310
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
D
dapan1121 已提交
311 312
  CTG_LOCK(CTG_READ, &dbCache->vgLock);
  
D
dapan 已提交
313 314 315 316 317 318 319 320 321 322
  if (dbCache->deleted) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

  
D
dapan1121 已提交
323 324 325
  if (NULL == dbCache->vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

D
dapan 已提交
326
    *inCache = false;
D
dapan1121 已提交
327
    ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
D
dapan1121 已提交
328 329 330
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
331 332
  *inCache = true;
  
D
dapan1121 已提交
333 334 335 336 337 338 339 340 341 342 343 344 345 346
  return TSDB_CODE_SUCCESS;
}

int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);

  if (dbCache->deleted) {
    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
347

D
dapan1121 已提交
348 349 350
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
351

D
dapan1121 已提交
352 353 354
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
}
D
dapan1121 已提交
355

D
dapan1121 已提交
356 357 358
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
}
D
dapan1121 已提交
359

D
dapan1121 已提交
360

D
dapan 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
  SCtgDBCache *dbCache = NULL;
  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
  
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
    }    
    
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
    
  return TSDB_CODE_SUCCESS;
}

int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}


D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
  if (NULL == pCtg->dbCache) {
    *pCache = NULL;
    *inCache = false;
    ctgWarn("empty db cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {  
    *pCache = NULL;
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan 已提交
415 416
  ctgAcquireVgInfo(pCtg, dbCache, inCache);
  if (!(*inCache)) {
D
dapan1121 已提交
417 418 419 420
    ctgReleaseDBCache(pCtg, dbCache);
  
    *pCache = NULL;
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
421
  }
D
dapan1121 已提交
422

D
dapan1121 已提交
423
  *pCache = dbCache;
D
dapan1121 已提交
424
  *inCache = true;
D
dapan1121 已提交
425

D
dapan1121 已提交
426
  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
D
dapan1121 已提交
427 428 429 430 431 432
  
  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
433
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
D
dapan1121 已提交
434 435 436
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan 已提交
437
  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
D
dapan1121 已提交
438 439 440 441 442 443

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
444
  
D
dapan1121 已提交
445
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
446
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
447
      .pCont   = msg,
D
dapan1121 已提交
448 449 450 451 452 453
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
454
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
455 456 457 458 459
    if (CTG_DB_NOT_EXIST(rpcRsp.code)) {
      ctgDebug("db not exist in mnode, dbFName:%s", input->db);
      return rpcRsp.code;
    }
  
H
Haojun Liao 已提交
460
    ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
D
dapan1121 已提交
461
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
462
  }
D
dapan1121 已提交
463

D
dapan1121 已提交
464 465 466 467 468
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
469

D
dapan 已提交
470 471
  ctgDebug("Got db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
472 473
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
474

D
dapan1121 已提交
475 476
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
477
    *exist = 0;
D
dapan1121 已提交
478 479 480 481
    ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
482 483
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
484 485
  if (NULL == dbCache) {
    *exist = 0;
D
dapan1121 已提交
486 487 488
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
489
  size_t sz = 0;
D
dapan1121 已提交
490 491 492 493
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);  
  STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
D
dapan1121 已提交
494
  if (NULL == tbMeta) {
D
dapan 已提交
495
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
496
    
D
dapan1121 已提交
497
    *exist = 0;
D
dapan1121 已提交
498
    ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
499 500 501 502
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
503

D
dapan 已提交
504
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
505
  
D
dapan1121 已提交
506
  ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
507 508 509 510
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
511

D
dapan1121 已提交
512 513
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
514
    *exist = 0;
D
dapan1121 已提交
515
    ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
D
dapan1121 已提交
516 517 518
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
519 520
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
521

D
dapan1121 已提交
522 523
  *pTableMeta = NULL;

D
dapan1121 已提交
524 525
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
526 527 528 529 530
  if (NULL == dbCache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
531 532 533 534 535
  size_t sz = 0;  
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan1121 已提交
536
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
537
    *exist = 0;
D
dapan1121 已提交
538 539
    ctgReleaseDBCache(pCtg, dbCache);
    ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
540 541 542
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
543
  *exist = 1;
D
dapan1121 已提交
544 545
  
  tbMeta = *pTableMeta;
D
dapan1121 已提交
546

D
dapan1121 已提交
547
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
548
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
549
    ctgDebug("Got meta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, pTableName->tname);
D
dapan1121 已提交
550 551 552
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
553
  CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
554
  
D
dapan1121 已提交
555
  STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
D
dapan1121 已提交
556
  if (NULL == stbMeta || NULL == *stbMeta) {
D
dapan1121 已提交
557
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
558 559
    ctgReleaseDBCache(pCtg, dbCache);
    ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid);
D
dapan1121 已提交
560 561 562 563
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
564

D
dapan1121 已提交
565
  if ((*stbMeta)->suid != tbMeta->suid) {    
D
dapan1121 已提交
566
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
567
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
568
    tfree(*pTableMeta);
D
dapan1121 已提交
569
    ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
570 571
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
572

D
dapan1121 已提交
573
  int32_t metaSize = CTG_META_SIZE(*stbMeta);
D
dapan1121 已提交
574 575
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
D
dapan1121 已提交
576
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
577
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
578
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
579
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
580 581
  }

D
dapan1121 已提交
582 583
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

D
dapan1121 已提交
584
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
585

D
dapan1121 已提交
586
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan 已提交
587

D
dapan1121 已提交
588
  ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
589 590 591 592
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
593 594
int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
595
    ctgWarn("empty db cache, tbName:%s", pTableName->tname);  
D
dapan1121 已提交
596 597 598
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
599 600
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
601

D
dapan 已提交
602 603
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
604 605 606
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
607

D
dapan1121 已提交
608 609 610
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));

D
dapan1121 已提交
611
  if (NULL == pTableMeta) {
D
dapan1121 已提交
612
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan 已提交
613 614
    ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);  
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
615
    
D
dapan1121 已提交
616 617 618
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
619 620
  *tbType = atomic_load_8(&pTableMeta->tableType);

D
dapan1121 已提交
621 622 623 624
  taosHashRelease(dbCache->tbCache.metaCache, pTableMeta);

  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan 已提交
625
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
626

D
dapan 已提交
627
  ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, pTableName->tname, *tbType);  
D
dapan1121 已提交
628 629 630 631
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
632
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
D
dapan1121 已提交
633
  SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
D
dapan1121 已提交
634 635 636 637
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
638
  ctgDebug("try to get table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
639 640 641 642 643 644

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_STB_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
645 646 647 648 649 650

  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_STB_META,
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
651

D
dapan1121 已提交
652 653
  SRpcMsg rpcRsp = {0};

D
dapan1121 已提交
654
  rpcSendRecv(pTransporter, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
655 656
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
657
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
658
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
659
      ctgDebug("stablemeta not exist in mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
660 661 662
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
663
    ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
D
dapan1121 已提交
664 665 666
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
667 668
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_STB_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
669
    ctgError("Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tbName);
D
dapan1121 已提交
670 671 672
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
673
  ctgDebug("Got table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
674 675 676 677

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
678
int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
D
dapan1121 已提交
679 680
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
681

D
dapan1121 已提交
682
  return ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, dbFName, (char *)pTableName->tname, output);
D
dapan1121 已提交
683
}
D
dapan1121 已提交
684

D
dapan1121 已提交
685 686
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
687
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
688 689
  }

D
dapan1121 已提交
690 691
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
692

D
dapan1121 已提交
693
  ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
694

D
dapan1121 已提交
695
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
D
dapan1121 已提交
696 697 698
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
699 700
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
D
dapan1121 已提交
701
    ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
702 703
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
704 705

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
706
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
707 708 709 710 711
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
H
Haojun Liao 已提交
712
  rpcSendRecv(pTransporter, &vgroupInfo->epset, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
713
  
D
dapan1121 已提交
714
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
715
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
716
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
717
      ctgDebug("tablemeta not exist in vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
718 719 720
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
721
    ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
722
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
723 724
  }

D
dapan1121 已提交
725 726
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
727
    ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
728 729 730
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
731
  ctgDebug("Got table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
732 733 734 735
  return TSDB_CODE_SUCCESS;
}


736 737
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
738 739 740 741 742 743 744 745
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
746
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
D
dapan1121 已提交
747
  SHashObj *vgroupHash = NULL;
748
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
749 750
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
751
  int32_t vgNum = taosHashGetSize(vgHash);
752

D
dapan1121 已提交
753
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
754
  if (NULL == vgList) {
D
dapan1121 已提交
755
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
756 757 758
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

D
dapan1121 已提交
759
  void *pIter = taosHashIterate(vgHash, NULL);
760 761
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
762

D
dapan1121 已提交
763
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
764
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
765
      taosHashCancelIterate(vgHash, pIter);      
D
dapan1121 已提交
766
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
767 768
    }
    
D
dapan1121 已提交
769
    pIter = taosHashIterate(vgHash, pIter);
770
    vgInfo = NULL;
D
dapan1121 已提交
771 772
  }

D
dapan1121 已提交
773
  *pList = vgList;
D
dapan1121 已提交
774

D
dapan1121 已提交
775
  ctgDebug("Got vgList from cache, vgNum:%d", vgNum);
D
dapan1121 已提交
776

D
dapan1121 已提交
777
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
778 779 780 781 782 783 784 785

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
786 787
}

D
dapan1121 已提交
788
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
789 790
  int32_t code = 0;
  
D
dapan1121 已提交
791
  int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
H
Haojun Liao 已提交
792 793 794
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

795
  if (vgNum <= 0) {
D
dapan1121 已提交
796
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
797
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
798 799
  }

800 801
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
802

D
dapan1121 已提交
803
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
804 805

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
806
  tNameExtractFullName(pTableName, tbFullName);
807 808 809

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

D
dapan1121 已提交
810
  void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
811 812 813
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
D
dapan1121 已提交
814
      taosHashCancelIterate(dbInfo->vgHash, pIter);
815
      break;
D
dapan1121 已提交
816
    }
817
    
D
dapan1121 已提交
818
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
819
    vgInfo = NULL;
D
dapan1121 已提交
820 821
  }

822
  if (NULL == vgInfo) {
D
dapan1121 已提交
823 824
    ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
825 826 827 828
  }

  *pVgroup = *vgInfo;

829
  CTG_RET(code);
D
dapan1121 已提交
830 831
}

D
dapan1121 已提交
832
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
D
dapan 已提交
833
  if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
834
    return -1;
D
dapan 已提交
835
  } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
836 837 838 839 840 841 842
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
D
dapan1121 已提交
843
  if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
844
    return -1;
D
dapan1121 已提交
845
  } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
846 847 848
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
849
  }
D
dapan1121 已提交
850 851
}

D
dapan1121 已提交
852
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
D
dapan1121 已提交
853 854 855 856
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

D
dapan1121 已提交
857
  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
D
dapan1121 已提交
858
  
D
dapan1121 已提交
859 860 861
  mgmt->slots = calloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan 已提交
862
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
863
  }
D
dapan1121 已提交
864

D
dapan1121 已提交
865 866 867 868
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
869

D
dapan1121 已提交
870

D
dapan1121 已提交
871
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
D
dapan1121 已提交
872
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
873

D
dapan1121 已提交
874
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
875 876 877 878 879 880 881 882
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
883
    }
D
dapan1121 已提交
884
  }
D
dapan1121 已提交
885

D
dapan1121 已提交
886 887 888
  if (NULL == taosArrayPush(slot->meta, meta)) {
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
889 890
  }

D
dapan1121 已提交
891
  slot->needSort = true;
D
dapan1121 已提交
892

D
dapan1121 已提交
893
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
894

D
dapan1121 已提交
895 896 897 898 899 900
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

D
dapan1121 已提交
901
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
D
dapan1121 已提交
902
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
903

D
dapan1121 已提交
904
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
905 906 907 908
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
909 910
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
911 912 913
  }

  if (slot->needSort) {
D
dapan1121 已提交
914
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
915 916
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
D
dapan1121 已提交
917
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
918 919 920
  }

  void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
D
dapan1121 已提交
921
  if (NULL == orig) {
D
dapan1121 已提交
922
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

D
dapan1121 已提交
942
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
D
dapan1121 已提交
943 944
  int16_t widx = abs(id % mgmt->slotNum);

D
dapan1121 已提交
945
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
  if (idx < 0) {
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

  qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}


D
dapan1121 已提交
978
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
979 980 981 982
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
983
  }
D
dapan1121 已提交
984

D
dapan1121 已提交
985
  SCtgRentSlot *slot = &mgmt->slots[ridx];
D
dapan1121 已提交
986
  int32_t code = 0;
D
dapan1121 已提交
987
  
D
dapan1121 已提交
988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = malloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

D
dapan1121 已提交
1024
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
1044 1045 1046
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1047
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
D
dapan1121 已提交
1048
  int32_t code = 0;
D
dapan1121 已提交
1049

D
dapan1121 已提交
1050 1051 1052
  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

D
dapan 已提交
1053
  newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1054
  if (NULL == newDBCache.tbCache.metaCache) {
D
dapan 已提交
1055
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1056 1057 1058
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

D
dapan 已提交
1059
  newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1060
  if (NULL == newDBCache.tbCache.stbCache) {
D
dapan 已提交
1061
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1062 1063 1064 1065
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
D
dapan1121 已提交
1066 1067 1068
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1069
      goto _return;
D
dapan1121 已提交
1070 1071 1072
    }
    
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1073 1074 1075
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
1076
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
D
dapan1121 已提交
1077 1078
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));

D
dapan1121 已提交
1079
  ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1080

D
dapan1121 已提交
1081 1082 1083
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));

  ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
D
dapan1121 已提交
1084

D
dapan1121 已提交
1085
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1086

D
dapan1121 已提交
1087
_return:
D
dapan1121 已提交
1088

D
dapan1121 已提交
1089
  ctgFreeDbCache(&newDBCache);
D
dapan1121 已提交
1090

D
dapan1121 已提交
1091 1092
  CTG_RET(code);
}
D
dapan1121 已提交
1093

D
dapan1121 已提交
1094

D
dapan1121 已提交
1095
void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
D
dapan1121 已提交
1096 1097 1098 1099 1100 1101 1102
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    void *pIter = taosHashIterate(cache->stbCache, NULL);
    while (pIter) {
      uint64_t *suid = NULL;
      taosHashGetKey(pIter, (void **)&suid, NULL);

D
dapan1121 已提交
1103
      if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgSTableVersionCompare)) {
D
dapan1121 已提交
1104 1105 1106 1107 1108 1109 1110 1111 1112 1113
        ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
      }
          
      pIter = taosHashIterate(cache->stbCache, pIter);
    }
  }
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
}


D
dapan1121 已提交
1114
int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
D
dapan 已提交
1115 1116 1117 1118
  uint64_t dbId = dbCache->dbId;
  
  ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

D
dapan1121 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131
  atomic_store_8(&dbCache->deleted, 1);

  ctgRemoveStbRent(pCtg, &dbCache->tbCache);

  ctgFreeDbCache(dbCache);

  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
  
  ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
D
dapan1121 已提交
1132 1133
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1134
  }
D
dapan 已提交
1135 1136

  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1137 1138 1139
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1140 1141


D
dapan1121 已提交
1142
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
D
dapan1121 已提交
1143 1144
  int32_t code = 0;
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1145
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1146
  
D
dapan1121 已提交
1147 1148
  if (dbCache) {
  // TODO OPEN IT
D
dapan1121 已提交
1149
#if 0    
D
dapan1121 已提交
1150 1151 1152 1153
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1154
#else
D
dapan1121 已提交
1155 1156 1157
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1158 1159
    }

D
dapan1121 已提交
1160 1161 1162 1163 1164
    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1165
    
D
dapan1121 已提交
1166 1167 1168 1169 1170 1171
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDB(pCtg, dbCache, dbFName));
D
dapan1121 已提交
1172
  }
D
dapan1121 已提交
1173 1174
  
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));
D
dapan1121 已提交
1175

D
dapan1121 已提交
1176
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1177

D
dapan1121 已提交
1178
  *pCache = dbCache;
D
dapan1121 已提交
1179

D
dapan1121 已提交
1180
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1181 1182 1183
}


D
dapan1121 已提交
1184 1185 1186 1187 1188 1189
int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) {
  int32_t code = 0;
  SDBVgInfo* dbInfo = *pDbInfo;
  
  if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
D
dapan1121 已提交
1190
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1191 1192
  }

D
dapan1121 已提交
1193
  bool newAdded = false;
D
dapan1121 已提交
1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204
  SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};

  SCtgDBCache *dbCache = NULL;
  CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
D
dapan1121 已提交
1205
  
D
dapan1121 已提交
1206 1207 1208 1209
  if (dbCache->vgInfo) {
    if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
      ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
      ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1210
      
D
dapan1121 已提交
1211
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1212
    }
D
dapan1121 已提交
1213 1214

    ctgFreeVgInfo(dbCache->vgInfo);
D
dapan1121 已提交
1215 1216
  }

D
dapan1121 已提交
1217
  dbCache->vgInfo = dbInfo;
D
dapan1121 已提交
1218

D
dapan1121 已提交
1219
  *pDbInfo = NULL;
D
dapan1121 已提交
1220

D
dapan1121 已提交
1221
  ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
D
dapan1121 已提交
1222

D
dapan1121 已提交
1223
  ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1224

D
dapan1121 已提交
1225
  dbCache = NULL;
D
dapan1121 已提交
1226

D
dapan1121 已提交
1227 1228 1229
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
  
D
dapan1121 已提交
1230 1231 1232 1233
  CTG_RET(code);
}


D
dapan1121 已提交
1234 1235
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
  SCtgTbMetaCache *tbCache = &dbCache->tbCache;
D
dapan1121 已提交
1236

D
dapan1121 已提交
1237 1238 1239 1240 1241
  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1242 1243
  }

D
dapan1121 已提交
1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257
  int8_t origType = 0;
  uint64_t origSuid = 0;
  bool isStb = meta->tableType == TSDB_SUPER_TABLE;
  STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (orig) {
    origType = orig->tableType;
    origSuid = orig->suid;
    
    if (origType == TSDB_SUPER_TABLE && ((!isStb) || origSuid != meta->suid)) {
      CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
      if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      }
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1258

D
dapan1121 已提交
1259 1260 1261 1262
      ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      
      ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgSTableVersionCompare);
    }
D
dapan1121 已提交
1263
  }
D
dapan1121 已提交
1264

D
dapan1121 已提交
1265 1266
  if (isStb) {
    CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1267
  }
D
dapan1121 已提交
1268
  
D
dapan1121 已提交
1269 1270 1271 1272 1273 1274 1275 1276 1277
  if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
    if (isStb) {
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    }
    
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1278

D
dapan 已提交
1279 1280
  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);

D
dapan1121 已提交
1281 1282 1283
  if (!isStb) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1284
  }
D
dapan1121 已提交
1285

D
dapan1121 已提交
1286 1287 1288 1289 1290
  if (isStb && origSuid == meta->suid) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1291

D
dapan1121 已提交
1292 1293 1294 1295 1296 1297
  STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1298
  }
D
dapan1121 已提交
1299 1300
  
  CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1301

D
dapan1121 已提交
1302 1303
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);

D
dapan 已提交
1304
  ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1305 1306 1307 1308 1309 1310 1311

  SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
  strcpy(metaRent.dbFName, dbFName);
  strcpy(metaRent.stbName, tbName);
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1312 1313
}

D
dapan 已提交
1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
  *dst = malloc(sizeof(SDBVgInfo));
  if (NULL == *dst) {
    qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  memcpy(*dst, src, sizeof(SDBVgInfo));

  size_t hashSize = taosHashGetSize(src->vgHash);
  (*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == (*dst)->vgHash) {
    qError("taosHashInit %d failed", (int32_t)hashSize);
    tfree(*dst);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  int32_t *vgId = NULL;
  void *pIter = taosHashIterate(src->vgHash, NULL);
  while (pIter) {
    taosHashGetKey(pIter, (void **)&vgId, NULL);

    if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
      qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
      taosHashCancelIterate(src->vgHash, pIter);
      taosHashCleanup((*dst)->vgHash);
      tfree(*dst);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(src->vgHash, pIter);
  }


  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
1353
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
D
dapan1121 已提交
1354
  bool inCache = false;
D
dapan1121 已提交
1355
  int32_t code = 0;
1356
  if (!forceUpdate) {
D
dapan1121 已提交
1357
    CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
D
dapan1121 已提交
1358
    if (inCache) {
D
dapan1121 已提交
1359 1360 1361 1362 1363 1364 1365
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

D
dapan1121 已提交
1366
  tstrncpy(input.db, dbFName, tListLen(input.db));
D
dapan1121 已提交
1367
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
1368

D
dapan 已提交
1369
  CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
1370

D
dapan 已提交
1371 1372 1373 1374 1375 1376 1377 1378
  CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
  
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
  SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
    ctgFreeVgInfo(DbOut.dbVgroup);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1379 1380
  }

D
dapan 已提交
1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->pCtg = pCtg;
  msg->dbId = DbOut.dbId;
  msg->dbInfo = DbOut.dbVgroup;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
1392
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
1393 1394 1395 1396 1397 1398 1399 1400 1401

_return:

  tfree(*pInfo);
  tfree(msg);
  
  *pInfo = DbOut.dbVgroup;
  
  CTG_RET(code);
D
dapan1121 已提交
1402 1403
}

D
dapan 已提交
1404

D
dapan1121 已提交
1405 1406 1407 1408 1409
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
  *pOutput = malloc(sizeof(STableMetaOutput));
  if (NULL == *pOutput) {
    qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
1410 1411
  }

D
dapan1121 已提交
1412 1413 1414 1415 1416 1417
  memcpy(*pOutput, output, sizeof(STableMetaOutput));

  if (output->tbMeta) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    (*pOutput)->tbMeta = malloc(metaSize);
    if (NULL == (*pOutput)->tbMeta) {
D
dapan 已提交
1418
      qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
D
dapan1121 已提交
1419 1420 1421 1422 1423
      tfree(*pOutput);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
D
dapan 已提交
1424 1425
  }

D
dapan1121 已提交
1426 1427 1428
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
1429 1430


D
dapan1121 已提交
1431 1432
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable, STableMetaOutput **pOutput) {
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1433 1434 1435 1436 1437 1438
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

D
dapan1121 已提交
1439
  CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
D
dapan1121 已提交
1440

D
dapan 已提交
1441
  SCtgUpdateTblMsg *msg = NULL;
D
dapan1121 已提交
1442
  STableMetaOutput  moutput = {0};
S
Shengliang Guan 已提交
1443
  STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
D
dapan1121 已提交
1444 1445 1446 1447 1448
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
1449
  if (CTG_IS_STABLE(isSTable)) {
D
dapan 已提交
1450
    ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1451 1452

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
1453
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTransporter, pMgmtEps, pTableName, output));
D
dapan1121 已提交
1454

D
dapan1121 已提交
1455 1456
    if (CTG_IS_META_NULL(output->metaType)) {
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1457 1458
    }
  } else {
D
dapan 已提交
1459
    ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
D
dapan1121 已提交
1460 1461

    // if get from vnode failed or no table meta, will not try mnode
D
dapan1121 已提交
1462
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1463

D
dapan1121 已提交
1464
    if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan 已提交
1465
      ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType);
D
dapan1121 已提交
1466

D
dapan1121 已提交
1467
      tfree(output->tbMeta);
D
dapan1121 已提交
1468
      
D
dapan1121 已提交
1469 1470
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, output));
    } else if (CTG_IS_META_BOTH(output->metaType)) {
D
dapan1121 已提交
1471
      int32_t exist = 0;
D
dapan1121 已提交
1472
      CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
D
dapan1121 已提交
1473
      if (0 == exist) {
D
dapan1121 已提交
1474
        CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTransporter, pMgmtEps, output->dbFName, output->tbName, &moutput));
D
dapan1121 已提交
1475

D
dapan1121 已提交
1476
        if (CTG_IS_META_NULL(moutput.metaType)) {
D
dapan1121 已提交
1477
          SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
1478 1479
        }
        
D
dapan1121 已提交
1480 1481
        tfree(output->tbMeta);
        output->tbMeta = moutput.tbMeta;
D
dapan1121 已提交
1482 1483
        moutput.tbMeta = NULL;
      } else {
D
dapan1121 已提交
1484
        tfree(output->tbMeta);
D
dapan1121 已提交
1485
        
D
dapan1121 已提交
1486
        SET_META_TYPE_CTABLE(output->metaType); 
D
dapan1121 已提交
1487
      }
D
dapan1121 已提交
1488 1489 1490
    }
  }

D
dapan1121 已提交
1491
  if (CTG_IS_META_NULL(output->metaType)) {
D
dapan 已提交
1492
    ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1493 1494 1495
    CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
  }

D
dapan 已提交
1496 1497 1498 1499 1500 1501
  if (CTG_IS_META_TABLE(output->metaType)) {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
  } else {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
  }

D
dapan1121 已提交
1502 1503 1504 1505 1506
  if (pOutput) {
    CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
  }

  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
D
dapan 已提交
1507
  msg = malloc(sizeof(SCtgUpdateTblMsg));
D
dapan1121 已提交
1508 1509 1510 1511 1512 1513 1514 1515 1516 1517 1518 1519
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
1520 1521
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
1522
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1523 1524 1525

_return:

D
dapan1121 已提交
1526
  tfree(output->tbMeta);
D
dapan 已提交
1527
  tfree(output);
D
dapan1121 已提交
1528
  tfree(msg);
D
dapan1121 已提交
1529 1530 1531 1532
  
  CTG_RET(code);
}

D
dapan1121 已提交
1533 1534
int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t isSTable) {
  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
1535 1536 1537 1538
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
  int32_t exist = 0;
D
dapan1121 已提交
1539
  int32_t code = 0;
D
dapan1121 已提交
1540 1541

  if (!forceUpdate) {
D
dapan1121 已提交
1542
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist));
D
dapan1121 已提交
1543 1544 1545 1546

    if (exist && CTG_TBTYPE_MATCH(isSTable, (*pTableMeta)->tableType)) {
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1547 1548

    tfree(*pTableMeta);
D
dapan1121 已提交
1549 1550 1551
  } else if (CTG_IS_UNKNOWN_STABLE(isSTable)) {
    int32_t tbType = 0;
    
D
dapan1121 已提交
1552
    CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType));
D
dapan1121 已提交
1553 1554 1555 1556

    CTG_SET_STABLE(isSTable, tbType);
  }

D
dapan1121 已提交
1557 1558
  STableMetaOutput *output = NULL;

D
dapan 已提交
1559 1560
  while (true) {
    CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, isSTable, &output));
D
dapan1121 已提交
1561

D
dapan 已提交
1562 1563 1564 1565
    if (CTG_IS_META_TABLE(output->metaType)) {
      *pTableMeta = output->tbMeta;
      goto _return;
    }
D
dapan1121 已提交
1566

D
dapan 已提交
1567 1568 1569 1570 1571 1572
    if (CTG_IS_META_BOTH(output->metaType)) {
      memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
      
      *pTableMeta = output->tbMeta;
      goto _return;
    }
D
dapan1121 已提交
1573

D
dapan 已提交
1574 1575 1576 1577 1578
    if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) {
      ctgError("invalid metaType:%d", output->metaType);
      tfree(output->tbMeta);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1579

D
dapan 已提交
1580
    // HANDLE ONLY CHILD TABLE META
D
dapan1121 已提交
1581

D
dapan 已提交
1582 1583 1584 1585 1586 1587 1588 1589
    SName stbName = *pTableName;
    strcpy(stbName.tname, output->tbName);
    
    CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist));
    if (0 == exist) {
      ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
      continue;
    }
D
dapan1121 已提交
1590

D
dapan 已提交
1591 1592 1593 1594
    memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));

    break;
  }
D
dapan1121 已提交
1595 1596 1597 1598 1599

_return:

  tfree(output);

D
dapan 已提交
1600 1601 1602 1603
  if (*pTableMeta) {
    ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType);
  }

D
dapan1121 已提交
1604 1605 1606 1607 1608 1609 1610 1611 1612 1613 1614 1615 1616 1617 1618 1619 1620 1621 1622 1623 1624 1625 1626 1627 1628
  CTG_RET(code);
}



int32_t ctgActUpdateVg(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateVgMsg *msg = action->data;
  
  CTG_ERR_JRET(ctgUpdateDBVgInfo(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));

_return:

  tfree(msg->dbInfo);
  tfree(msg);
  
  CTG_RET(code);
}

int32_t ctgActRemoveDB(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveDBMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
D
dapan 已提交
1629
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
D
dapan1121 已提交
1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657
  if (NULL == dbCache) {
    goto _return;
  }
  
  if (dbCache->dbId != msg->dbId) {
    ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
    goto _return;
  }
  
  CTG_ERR_JRET(ctgRemoveDB(pCtg, dbCache, msg->dbFName));

_return:

  tfree(msg);
  
  CTG_RET(code);
}


int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateTblMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;
  STableMetaOutput* output = msg->output;
  SCtgDBCache *dbCache = NULL;

  if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
D
dapan 已提交
1658
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679 1680 1681 1682 1683
  }

  if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }    

  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
  }

  if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
  }

_return:

D
dapan 已提交
1684 1685 1686
  if (output) {
    tfree(output->tbMeta);
    tfree(output);
D
dapan1121 已提交
1687
  }
D
dapan 已提交
1688
  
D
dapan1121 已提交
1689 1690 1691 1692 1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723
  tfree(msg);
  
  CTG_RET(code);
}


int32_t ctgActRemoveStb(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveStbMsg *msg = action->data;
  bool removed = false;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->dbId != msg->dbId) {
    ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {  
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1724
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750 1751 1752 1753
  }  
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
  CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  
  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);

  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgSTableVersionCompare));
  
  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
  
_return:

  tfree(msg);
  
  CTG_RET(code);
}

int32_t ctgActRemoveTbl(SCtgMetaAction *action) {

}



void* ctgUpdateThreadFunc(void* param) {
  setThreadName("catalog");

  qInfo("catalog update thread started");

D
dapan 已提交
1754
  CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
1755 1756
  
  while (true) {
D
dapan 已提交
1757
    tsem_wait(&gCtgMgmt.sem);
D
dapan1121 已提交
1758
    
D
dapan 已提交
1759
    if (atomic_load_8(&gCtgMgmt.exit)) {
D
dapan1121 已提交
1760 1761 1762 1763 1764 1765
      break;
    }

    SCtgMetaAction *action = NULL;
    ctgPopAction(&action);

D
dapan 已提交
1766 1767 1768 1769 1770
    qDebug("process %s action", gCtgAction[action->act].name);
    
    (*gCtgAction[action->act].func)(action);

    CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); 
D
dapan1121 已提交
1771 1772
  }

D
dapan 已提交
1773
  CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
1774 1775 1776 1777 1778 1779 1780 1781 1782 1783 1784 1785

  qInfo("catalog update thread stopped");
  
  return NULL;
}


int32_t ctgStartUpdateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

D
dapan 已提交
1786
  if (pthread_create(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
D
dapan1121 已提交
1787 1788
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
D
dapan1121 已提交
1789 1790
  }
  
D
dapan1121 已提交
1791
  pthread_attr_destroy(&thAttr);
D
dapan1121 已提交
1792 1793 1794 1795
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1796
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan 已提交
1797
  if (gCtgMgmt.pCluster) {
D
dapan 已提交
1798
    qError("catalog already initialized");
D
dapan1121 已提交
1799
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1800 1801
  }

D
dapan 已提交
1802
  atomic_store_8(&gCtgMgmt.exit, false);
D
dapan1121 已提交
1803

D
dapan1121 已提交
1804
  if (cfg) {
D
dapan 已提交
1805
    memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
1806

D
dapan 已提交
1807 1808
    if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
      gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
D
dapan1121 已提交
1809 1810
    }

D
dapan 已提交
1811 1812
    if (gCtgMgmt.cfg.maxTblCacheNum == 0) {
      gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
D
dapan1121 已提交
1813
    }
D
dapan1121 已提交
1814

D
dapan 已提交
1815 1816
    if (gCtgMgmt.cfg.dbRentSec == 0) {
      gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1817 1818
    }

D
dapan 已提交
1819 1820
    if (gCtgMgmt.cfg.stbRentSec == 0) {
      gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1821
    }
D
dapan1121 已提交
1822
  } else {
D
dapan 已提交
1823 1824 1825 1826
    gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
    gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
1827 1828
  }

D
dapan 已提交
1829 1830
  gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == gCtgMgmt.pCluster) {
D
dapan1121 已提交
1831 1832
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1833 1834
  }

D
dapan1121 已提交
1835 1836
  CTG_ERR_RET(ctgStartUpdateThread());

D
dapan 已提交
1837
  tsem_init(&gCtgMgmt.sem, 0, 0);
D
dapan1121 已提交
1838

D
dapan 已提交
1839 1840
  gCtgMgmt.head = calloc(1, sizeof(SCtgQNode));
  if (NULL == gCtgMgmt.head) {
D
dapan1121 已提交
1841 1842 1843
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
1844
  gCtgMgmt.tail = gCtgMgmt.head;
D
dapan1121 已提交
1845

D
dapan 已提交
1846
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
D
dapan1121 已提交
1847

D
dapan 已提交
1848
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1849 1850
}

D
dapan1121 已提交
1851
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
1852
  if (NULL == catalogHandle) {
D
dapan1121 已提交
1853
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1854 1855
  }

D
dapan 已提交
1856
  if (NULL == gCtgMgmt.pCluster) {
D
dapan 已提交
1857
    qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
1858
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
1859 1860
  }

D
dapan1121 已提交
1861 1862
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
1863

D
dapan1121 已提交
1864
  while (true) {
D
dapan 已提交
1865
    SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
1866

D
dapan1121 已提交
1867 1868 1869 1870 1871
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
1872

D
dapan1121 已提交
1873 1874 1875 1876 1877 1878
    clusterCtg = calloc(1, sizeof(SCatalog));
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan1121 已提交
1879 1880
    clusterCtg->clusterId = clusterId;

D
dapan 已提交
1881 1882
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
D
dapan1121 已提交
1883

D
dapan 已提交
1884
    clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1885 1886 1887 1888 1889
    if (NULL == clusterCtg->dbCache) {
      qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan 已提交
1890
    SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1891
    if (NULL == metaCache) {
D
dapan 已提交
1892
      qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1893 1894 1895
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
D
dapan 已提交
1896
    code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
D
dapan1121 已提交
1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
1910
  }
D
dapan1121 已提交
1911 1912

  *catalogHandle = clusterCtg;
D
dapan 已提交
1913
  
D
dapan1121 已提交
1914
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1915 1916 1917 1918 1919 1920 1921 1922

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

D
dapan1121 已提交
1923 1924
void catalogFreeHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
1925 1926
    return;
  }
D
dapan1121 已提交
1927

D
dapan 已提交
1928
  if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
D
dapan1121 已提交
1929
    ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
D
dapan1121 已提交
1930 1931 1932
    return;
  }

D
dapan1121 已提交
1933
  uint64_t clusterId = pCtg->clusterId;
D
dapan1121 已提交
1934
  
D
dapan1121 已提交
1935
  ctgFreeHandle(pCtg);
D
dapan1121 已提交
1936 1937

  ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
D
dapan 已提交
1938 1939
}

D
dapan1121 已提交
1940
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) {
D
dapan1121 已提交
1941 1942
  CTG_API_ENTER();

D
dapan1121 已提交
1943 1944 1945 1946 1947
  if (NULL == pCtg || NULL == dbFName || NULL == version) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
1948
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1949
    ctgInfo("empty db cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1950
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1951 1952
  }

D
dapan1121 已提交
1953 1954 1955
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1956
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1957
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1958 1959
  }

D
dapan 已提交
1960 1961 1962
  bool inCache = false;
  ctgAcquireVgInfo(pCtg, dbCache, &inCache);
  if (!inCache) {
D
dapan1121 已提交
1963
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
1964 1965

    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1966
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1967 1968
  }

D
dapan1121 已提交
1969 1970 1971 1972
  *version = dbCache->vgInfo->vgVersion;

  ctgReleaseVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
1973

D
dapan1121 已提交
1974
  ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
D
dapan1121 已提交
1975

D
dapan1121 已提交
1976
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1977 1978
}

D
dapan1121 已提交
1979
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
D
dapan1121 已提交
1980 1981
  CTG_API_ENTER();

D
dapan1121 已提交
1982 1983 1984
  if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
1985

D
dapan1121 已提交
1986
  SCtgDBCache* dbCache = NULL;
1987
  int32_t code = 0;
D
dapan1121 已提交
1988
  SArray *vgList = NULL;
D
dapan1121 已提交
1989 1990 1991 1992 1993 1994 1995
  SHashObj *vgHash = NULL;
  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo));
  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
D
dapan1121 已提交
1996 1997
  }

D
dapan1121 已提交
1998
  CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
D
dapan1121 已提交
1999 2000 2001 2002 2003

  *vgroupList = vgList;
  vgList = NULL;

_return:
D
dapan1121 已提交
2004 2005

  if (dbCache) {
D
dapan1121 已提交
2006 2007
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
2008 2009
  }

D
dapan1121 已提交
2010 2011 2012
  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2013 2014
  }

D
dapan1121 已提交
2015
  CTG_API_LEAVE(code);  
D
dapan1121 已提交
2016 2017 2018
}


D
dapan1121 已提交
2019
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
D
dapan1121 已提交
2020
  CTG_API_ENTER();
D
dapan1121 已提交
2021 2022

  int32_t code = 0;
D
dapan1121 已提交
2023
  
D
dapan1121 已提交
2024
  if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
D
dapan1121 已提交
2025 2026 2027
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2028 2029 2030 2031
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
  SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
2032
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2033 2034
  }

D
dapan1121 已提交
2035 2036 2037 2038
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;
D
dapan1121 已提交
2039
  dbInfo = NULL;
D
dapan1121 已提交
2040

D
dapan1121 已提交
2041
  action.data = msg;
D
dapan1121 已提交
2042

D
dapan1121 已提交
2043
  CTG_ERR_JRET(ctgPushAction(&action));
D
dapan1121 已提交
2044

D
dapan 已提交
2045 2046
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2047 2048
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2049 2050
_return:

D
dapan1121 已提交
2051 2052 2053
  if (dbInfo) {
    taosHashCleanup(dbInfo->vgHash);
    tfree(dbInfo);
D
dapan1121 已提交
2054
  }
D
dapan1121 已提交
2055 2056

  tfree(msg);
D
dapan1121 已提交
2057
  
D
dapan1121 已提交
2058
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2059 2060 2061
}


D
dapan1121 已提交
2062 2063 2064
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
  CTG_API_ENTER();

D
dapan1121 已提交
2065 2066
  int32_t code = 0;
  
D
dapan1121 已提交
2067 2068
  if (NULL == pCtg || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2069 2070
  }

D
dapan1121 已提交
2071
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2072
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2073
  }
D
dapan1121 已提交
2074

D
dapan1121 已提交
2075 2076 2077 2078 2079
  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
  SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2080
  }
D
dapan1121 已提交
2081

D
dapan1121 已提交
2082 2083 2084
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;
D
dapan1121 已提交
2085

D
dapan1121 已提交
2086 2087 2088 2089
  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2090 2091
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2092
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2093
  
D
dapan1121 已提交
2094 2095 2096
_return:

  tfree(action.data);
D
dapan1121 已提交
2097

D
dapan1121 已提交
2098
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2099 2100
}

D
dapan1121 已提交
2101 2102 2103
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
  CTG_API_ENTER();

D
dapan 已提交
2104 2105
  int32_t code = 0;
  
D
dapan1121 已提交
2106 2107
  if (NULL == pCtg || NULL == dbFName || NULL == stbName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
2108 2109
  }

D
dapan1121 已提交
2110
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2111
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2112
  }
D
dapan1121 已提交
2113 2114 2115 2116 2117 2118

  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB};
  SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
2119 2120
  }

D
dapan1121 已提交
2121 2122 2123 2124 2125
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  strncpy(msg->stbName, stbName, sizeof(msg->stbName));
  msg->dbId = dbId;
  msg->suid = suid;
D
dapan1121 已提交
2126

D
dapan1121 已提交
2127 2128 2129 2130
  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2131 2132
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2133
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2134
  
D
dapan1121 已提交
2135 2136 2137 2138
_return:

  tfree(action.data);

D
dapan1121 已提交
2139
  CTG_API_LEAVE(code);
D
dapan 已提交
2140 2141
}

D
dapan1121 已提交
2142

D
dapan1121 已提交
2143
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2144 2145
  CTG_API_ENTER();

D
dapan1121 已提交
2146
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1));
D
dapan1121 已提交
2147
}
D
dapan1121 已提交
2148

D
dapan1121 已提交
2149
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2150 2151
  CTG_API_ENTER();

D
dapan1121 已提交
2152
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1));
D
dapan1121 已提交
2153 2154
}

D
dapan1121 已提交
2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165 2166 2167
int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == rspMsg) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
2168 2169
  int32_t code = 0;

D
dapan1121 已提交
2170 2171
  strcpy(output->dbFName, rspMsg->dbFName);
  strcpy(output->tbName, rspMsg->tbName);
D
dapan1121 已提交
2172

D
dapan1121 已提交
2173
  output->dbId = rspMsg->dbId;
D
dapan1121 已提交
2174
  
D
dapan1121 已提交
2175
  SET_META_TYPE_TABLE(output->metaType);
D
dapan1121 已提交
2176
  
D
dapan1121 已提交
2177
  CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
D
dapan1121 已提交
2178

D
dapan1121 已提交
2179 2180 2181 2182 2183 2184
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
  SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
2185

D
dapan1121 已提交
2186 2187 2188 2189 2190 2191 2192
  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2193 2194
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2195 2196
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2197 2198
_return:

D
dapan1121 已提交
2199 2200 2201
  tfree(output->tbMeta);
  tfree(output);
  tfree(msg);
D
dapan1121 已提交
2202
  
D
dapan1121 已提交
2203
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2204 2205 2206
}


D
dapan1121 已提交
2207
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
D
dapan1121 已提交
2208 2209
  CTG_API_ENTER();

D
dapan1121 已提交
2210 2211 2212 2213 2214
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTransporter, pMgmtEps, pTableName, isSTable, NULL));
2215
}
2216

D
dapan1121 已提交
2217
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
2218 2219
  CTG_API_ENTER();

D
dapan1121 已提交
2220
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable));
D
dapan1121 已提交
2221 2222
}

D
dapan1121 已提交
2223
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
D
dapan1121 已提交
2224
  CTG_API_ENTER();
D
dapan1121 已提交
2225 2226 2227 2228

  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2229 2230 2231 2232
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
2233
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
2234
  SArray *vgList = NULL;
D
dapan 已提交
2235
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
2236

D
dapan1121 已提交
2237
  *pVgList = NULL;
D
dapan1121 已提交
2238
  
D
dapan1121 已提交
2239
  CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, false, &tbMeta, -1));
D
dapan1121 已提交
2240

H
Haojun Liao 已提交
2241 2242
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan 已提交
2243

D
dapan1121 已提交
2244 2245 2246 2247 2248 2249 2250 2251 2252 2253
  SHashObj *vgHash = NULL;  
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));

  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
  }

  /* TODO REMOEV THIS ....
D
dapan 已提交
2254 2255 2256
  if (0 == tbMeta->vgId) {
    SVgroupInfo vgroup = {0};
    
D
dapan1121 已提交
2257
    catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup);
D
dapan 已提交
2258 2259 2260

    tbMeta->vgId = vgroup.vgId;
  }
D
dapan1121 已提交
2261
  // TODO REMOVE THIS ....*/
D
dapan 已提交
2262

2263
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
2264
    CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
D
dapan1121 已提交
2265
  } else {
2266
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
2267
    if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
2268
      ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan 已提交
2269
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
2270
    }
D
dapan1121 已提交
2271

D
dapan1121 已提交
2272 2273
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan1121 已提交
2274
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
D
dapan 已提交
2275 2276 2277
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
2278
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
2279
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan1121 已提交
2280 2281
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
2282

D
dapan1121 已提交
2283
    *pVgList = vgList;
D
dapan1121 已提交
2284 2285
    vgList = NULL;
  }
D
dapan 已提交
2286

D
dapan1121 已提交
2287
_return:
D
dapan 已提交
2288

D
dapan1121 已提交
2289
  if (dbCache) {
D
dapan1121 已提交
2290 2291 2292 2293 2294 2295 2296 2297 2298
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  tfree(tbMeta);

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2299 2300 2301 2302 2303 2304
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
2305
  
D
dapan1121 已提交
2306
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2307 2308 2309
}


D
dapan1121 已提交
2310
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTransporter, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
2311 2312
  CTG_API_ENTER();

D
dapan1121 已提交
2313 2314
  SCtgDBCache* dbCache = NULL;
  int32_t code = 0;
H
Haojun Liao 已提交
2315 2316
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
2317

D
dapan1121 已提交
2318 2319
  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTransporter, pMgmtEps, db, false, &dbCache, &vgInfo));
D
dapan1121 已提交
2320

D
dapan1121 已提交
2321
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
D
dapan1121 已提交
2322

D
dapan1121 已提交
2323
_return:
D
dapan1121 已提交
2324

D
dapan1121 已提交
2325
  if (dbCache) {
D
dapan1121 已提交
2326 2327 2328 2329 2330 2331 2332
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2333
  }
D
dapan1121 已提交
2334

D
dapan1121 已提交
2335
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2336 2337 2338
}


D
dapan1121 已提交
2339
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
D
dapan1121 已提交
2340 2341
  CTG_API_ENTER();

D
dapan1121 已提交
2342 2343 2344 2345
  if (NULL == pCtg || NULL == pTransporter || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2346
  int32_t code = 0;
D
dapan1121 已提交
2347
  pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2348 2349 2350

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
2351
    if (tbNum <= 0) {
D
dapan1121 已提交
2352
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
2353
      CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2354
    }
H
Haojun Liao 已提交
2355

D
dapan1121 已提交
2356 2357
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
2358
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
2359
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2360 2361 2362 2363 2364 2365
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
D
dapan1121 已提交
2366
      CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTransporter, pMgmtEps, name, false, &pTableMeta, -1));
D
dapan1121 已提交
2367 2368 2369 2370 2371 2372 2373 2374 2375

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

D
dapan1121 已提交
2376
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2377 2378

_return:  
D
dapan1121 已提交
2379

D
dapan1121 已提交
2380 2381 2382 2383 2384 2385 2386 2387
  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
2388
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2389
  }
D
dapan 已提交
2390
  
D
dapan1121 已提交
2391
  CTG_API_LEAVE(code);
2392
}
D
dapan 已提交
2393

D
dapan1121 已提交
2394
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
D
dapan1121 已提交
2395 2396
  CTG_API_ENTER();

D
dapan1121 已提交
2397 2398 2399 2400
  if (NULL == pCtg || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2401
  //TODO
D
dapan 已提交
2402

D
dapan1121 已提交
2403
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2404 2405
}

D
dapan1121 已提交
2406
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) {
D
dapan1121 已提交
2407 2408
  CTG_API_ENTER();

D
dapan1121 已提交
2409 2410
  if (NULL == pCtg || NULL == stables || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2411 2412
  }

D
dapan1121 已提交
2413 2414 2415 2416
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) {
D
dapan1121 已提交
2417
  CTG_API_ENTER();
D
dapan1121 已提交
2418 2419 2420 2421
  
  if (NULL == pCtg || NULL == dbs || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2422

D
dapan1121 已提交
2423
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
D
dapan1121 已提交
2424 2425
}

D
dapan 已提交
2426

D
dapan 已提交
2427
void catalogDestroy(void) {
D
dapan1121 已提交
2428 2429
  qInfo("start to destroy catalog");
  
D
dapan 已提交
2430
  if (NULL == gCtgMgmt.pCluster || atomic_load_8(&gCtgMgmt.exit)) {
D
dapan1121 已提交
2431 2432 2433
    return;
  }

D
dapan 已提交
2434 2435 2436
  atomic_store_8(&gCtgMgmt.exit, true);

  tsem_post(&gCtgMgmt.sem);
D
dapan1121 已提交
2437

D
dapan1121 已提交
2438 2439 2440 2441
  while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
    usleep(1);
  }
  
D
dapan 已提交
2442
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2443

D
dapan1121 已提交
2444
  SCatalog *pCtg = NULL;
D
dapan 已提交
2445
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
2446
  while (pIter) {
D
dapan1121 已提交
2447
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
2448

D
dapan1121 已提交
2449 2450
    if (pCtg) {
      catalogFreeHandle(pCtg);
D
dapan1121 已提交
2451 2452
    }
    
D
dapan 已提交
2453
    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
D
dapan 已提交
2454
  }
D
dapan1121 已提交
2455
  
D
dapan 已提交
2456 2457
  taosHashCleanup(gCtgMgmt.pCluster);
  gCtgMgmt.pCluster = NULL;
D
dapan1121 已提交
2458

D
dapan 已提交
2459
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2460

D
dapan1121 已提交
2461
  qInfo("catalog destroyed");
D
dapan 已提交
2462 2463 2464 2465
}