catalog.c 69.7 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan 已提交
21 22 23 24 25 26
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTbl(SCtgMetaAction *action);

D
dapan 已提交
27
SCatalogMgmt gCtgMgmt = {0};
D
dapan1121 已提交
28
SCtgDebug gCTGDebug = {0};
D
dapan 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
SCtgAction gCtgAction[CTG_ACT_MAX] = {{
                            CTG_ACT_UPDATE_VG,
                            "update vgInfo",
                            ctgActUpdateVg
                          },
                          {
                            CTG_ACT_UPDATE_TBL,
                            "update tbMeta",
                            ctgActUpdateTbl
                          },
                          {
                            CTG_ACT_REMOVE_DB,
                            "remove DB",
                            ctgActRemoveDB
                          },
                          {
                            CTG_ACT_REMOVE_STB,
                            "remove stbMeta",
                            ctgActRemoveStb
                          },
                          {
                            CTG_ACT_REMOVE_TBL,
                            "remove tbMeta",
                            ctgActRemoveTbl
                          }
};

int32_t ctgDbgEnableDebug(char *option) {
  if (0 == strcasecmp(option, "lock")) {
    gCTGDebug.lockDebug = true;
    qDebug("lock debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "cache")) {
    gCTGDebug.cacheDebug = true;
    qDebug("cache debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  if (0 == strcasecmp(option, "api")) {
    gCTGDebug.apiDebug = true;
    qDebug("api debug enabled");
    return TSDB_CODE_SUCCESS;
  }

  qError("invalid debug option:%s", option);
  
  return TSDB_CODE_CTG_INTERNAL_ERROR;
}

int32_t ctgDbgGetStatNum(char *option, void *res) {
  if (0 == strcasecmp(option, "runtime.qDoneNum")) {
    *(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
    return TSDB_CODE_SUCCESS;
  }
85

D
dapan 已提交
86 87 88 89
  qError("invalid stat option:%s", option);
  
  return TSDB_CODE_CTG_INTERNAL_ERROR;
}
D
dapan1121 已提交
90

D
dapan1121 已提交
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) {
  return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
}

int32_t ctgDbgGetStbNum(SCtgDBCache *dbCache) {
  return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
}

int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) {
  int32_t num = 0;
  for (uint16_t i = 0; i < rent->slotNum; ++i) {
    SCtgRentSlot *slot = &rent->slots[i];
    if (NULL == slot->meta) {
      continue;
    }

    num += taosArrayGetSize(slot->meta);
  }

  return num;
}

D
dapan1121 已提交
113 114
int32_t ctgDbgGetClusterCacheNum(SCatalog* pCtg, int32_t type) {
  if (NULL == pCtg || NULL == pCtg->dbCache) {
D
dapan1121 已提交
115 116 117 118 119
    return 0;
  }

  switch (type) {
    case CTG_DBG_DB_NUM:
D
dapan1121 已提交
120
      return (int32_t)taosHashGetSize(pCtg->dbCache);
D
dapan1121 已提交
121
    case CTG_DBG_DB_RENT_NUM:
D
dapan1121 已提交
122
      return ctgDbgGetRentNum(&pCtg->dbRent);
D
dapan1121 已提交
123
    case CTG_DBG_STB_RENT_NUM:
D
dapan1121 已提交
124
      return ctgDbgGetRentNum(&pCtg->stbRent);
D
dapan1121 已提交
125 126 127 128 129 130
    default:
      break;
  }

  SCtgDBCache *dbCache = NULL;
  int32_t num = 0;
D
dapan1121 已提交
131
  void *pIter = taosHashIterate(pCtg->dbCache, NULL);
D
dapan1121 已提交
132 133 134 135 136 137 138 139 140 141 142 143 144
  while (pIter) {
    dbCache = (SCtgDBCache *)pIter;
    switch (type) {
      case CTG_DBG_META_NUM:
        num += ctgDbgGetTbMetaNum(dbCache);
        break;
      case CTG_DBG_STB_NUM:
        num += ctgDbgGetStbNum(dbCache);
        break;
      default:
        ctgError("invalid type:%d", type);
        break;
    }
D
dapan1121 已提交
145
    pIter = taosHashIterate(pCtg->dbCache, pIter);
D
dapan1121 已提交
146 147 148 149 150 151 152
  }

  return num;
}


void ctgDbgShowDBCache(SHashObj *dbHash) {
D
dapan1121 已提交
153 154 155 156 157 158 159 160 161 162 163 164 165
  if (NULL == dbHash) {
    return;
  }

  int32_t i = 0;
  SCtgDBCache *dbCache = NULL;
  void *pIter = taosHashIterate(dbHash, NULL);
  while (pIter) {
    char *dbFName = NULL;
    size_t len = 0;
    
    dbCache = (SCtgDBCache *)pIter;

D
dapan1121 已提交
166
    taosHashGetKey(dbCache, (void **)&dbFName, &len);
D
dapan1121 已提交
167
    
D
dapan1121 已提交
168
    CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
D
dapan1121 已提交
169 170 171 172 173
    
    pIter = taosHashIterate(dbHash, pIter);
  }
}

D
dapan1121 已提交
174 175 176



D
dapan1121 已提交
177 178
void ctgDbgShowClusterCache(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
179 180 181
    return;
  }

D
dapan1121 已提交
182 183 184
  CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCtg->clusterId, pCtg);
  CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 
    ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM));    
D
dapan1121 已提交
185
  
D
dapan1121 已提交
186
  ctgDbgShowDBCache(pCtg->dbCache);
D
dapan1121 已提交
187 188
}

D
dapan 已提交
189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219


void ctgPopAction(SCtgMetaAction **action) {
  SCtgQNode *orig = gCtgMgmt.head;
  
  SCtgQNode *node = gCtgMgmt.head->next;
  gCtgMgmt.head = gCtgMgmt.head->next;

  CTG_QUEUE_SUB();
  
  tfree(orig);

  *action = &node->action;
}


int32_t ctgPushAction(SCtgMetaAction *action) {
  SCtgQNode *node = calloc(1, sizeof(SCtgQNode));
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
  node->action = *action;

  CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock);
  gCtgMgmt.tail->next = node;
  gCtgMgmt.tail = node;
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock);

  CTG_QUEUE_ADD();
D
dapan1121 已提交
220
  CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum);
D
dapan 已提交
221 222 223 224 225 226 227

  tsem_post(&gCtgMgmt.sem);

  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
  if (NULL == mgmt->slots) {
    return;
  }

  for (int32_t i = 0; i < mgmt->slotNum; ++i) {
    SCtgRentSlot *slot = &mgmt->slots[i];
    if (slot->meta) {
      taosArrayDestroy(slot->meta);
      slot->meta = NULL;
    }
  }

  tfree(mgmt->slots);
}


D
dapan1121 已提交
245 246 247 248 249
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    taosHashCleanup(cache->stbCache);
    cache->stbCache = NULL;
D
dapan1121 已提交
250
  }
D
dapan1121 已提交
251
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
D
dapan1121 已提交
252

D
dapan1121 已提交
253 254 255 256
  CTG_LOCK(CTG_WRITE, &cache->metaLock);
  if (cache->metaCache) {
    taosHashCleanup(cache->metaCache);
    cache->metaCache = NULL;
D
dapan1121 已提交
257
  }
D
dapan1121 已提交
258
  CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
D
dapan1121 已提交
259 260
}

D
dapan1121 已提交
261 262 263 264 265 266 267 268 269 270 271 272 273
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
  if (NULL == vgInfo) {
    return;
  }

  if (vgInfo->vgHash) {
    taosHashCleanup(vgInfo->vgHash);
    vgInfo->vgHash = NULL;
  }
  
  tfree(vgInfo);
}

D
dapan1121 已提交
274 275 276 277 278
void ctgFreeDbCache(SCtgDBCache *dbCache) {
  if (NULL == dbCache) {
    return;
  }

D
dapan1121 已提交
279
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
280
  ctgFreeVgInfo (dbCache->vgInfo);
D
dapan1121 已提交
281
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
D
dapan1121 已提交
282 283 284 285 286

  ctgFreeTableMetaCache(&dbCache->tbCache);
}


D
dapan1121 已提交
287 288 289 290 291 292
void ctgFreeHandle(SCatalog* pCtg) {
  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);
  
  if (pCtg->dbCache) {
    void *pIter = taosHashIterate(pCtg->dbCache, NULL);
D
dapan1121 已提交
293 294 295
    while (pIter) {
      SCtgDBCache *dbCache = pIter;

D
dapan1121 已提交
296 297
      atomic_store_8(&dbCache->deleted, 1);

D
dapan1121 已提交
298
      ctgFreeDbCache(dbCache);
D
dapan1121 已提交
299 300
            
      pIter = taosHashIterate(pCtg->dbCache, pIter);
D
dapan1121 已提交
301 302
    }  

D
dapan1121 已提交
303
    taosHashCleanup(pCtg->dbCache);
D
dapan1121 已提交
304 305
  }
  
D
dapan1121 已提交
306
  free(pCtg);
D
dapan1121 已提交
307 308
}

D
dapan1121 已提交
309

D
dapan 已提交
310
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
D
dapan1121 已提交
311 312
  CTG_LOCK(CTG_READ, &dbCache->vgLock);
  
D
dapan 已提交
313 314 315 316 317 318 319 320 321 322
  if (dbCache->deleted) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

  
D
dapan1121 已提交
323 324 325
  if (NULL == dbCache->vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

D
dapan 已提交
326
    *inCache = false;
D
dapan1121 已提交
327
    ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
D
dapan1121 已提交
328 329 330
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
331 332
  *inCache = true;
  
D
dapan1121 已提交
333 334 335 336 337 338 339 340 341 342 343 344 345 346
  return TSDB_CODE_SUCCESS;
}

int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);

  if (dbCache->deleted) {
    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
347

D
dapan1121 已提交
348 349 350
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
351

D
dapan1121 已提交
352 353 354
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
}
D
dapan1121 已提交
355

D
dapan1121 已提交
356 357 358
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
}
D
dapan1121 已提交
359

D
dapan1121 已提交
360

D
dapan 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
  SCtgDBCache *dbCache = NULL;
  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
  
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
    }    
    
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
    
  return TSDB_CODE_SUCCESS;
}

int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}


D
dapan1121 已提交
399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
  if (NULL == pCtg->dbCache) {
    *pCache = NULL;
    *inCache = false;
    ctgWarn("empty db cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {  
    *pCache = NULL;
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan 已提交
415 416
  ctgAcquireVgInfo(pCtg, dbCache, inCache);
  if (!(*inCache)) {
D
dapan1121 已提交
417 418 419 420
    ctgReleaseDBCache(pCtg, dbCache);
  
    *pCache = NULL;
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
421
  }
D
dapan1121 已提交
422

D
dapan1121 已提交
423
  *pCache = dbCache;
D
dapan1121 已提交
424
  *inCache = true;
D
dapan1121 已提交
425

D
dapan1121 已提交
426
  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
D
dapan1121 已提交
427 428 429 430 431 432
  
  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
433
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
D
dapan1121 已提交
434 435 436
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan 已提交
437
  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
D
dapan1121 已提交
438 439 440 441 442 443

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
444
  
D
dapan1121 已提交
445
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
446
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
447
      .pCont   = msg,
D
dapan1121 已提交
448 449 450 451 452 453
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
454
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
455 456 457 458 459
    if (CTG_DB_NOT_EXIST(rpcRsp.code)) {
      ctgDebug("db not exist in mnode, dbFName:%s", input->db);
      return rpcRsp.code;
    }
  
H
Haojun Liao 已提交
460
    ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
D
dapan1121 已提交
461
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
462
  }
D
dapan1121 已提交
463

D
dapan1121 已提交
464 465 466 467 468
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
469

D
dapan 已提交
470 471
  ctgDebug("Got db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
472 473
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
474

D
dapan1121 已提交
475 476
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
477
    *exist = 0;
D
dapan1121 已提交
478 479 480 481
    ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
482 483
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
484 485
  if (NULL == dbCache) {
    *exist = 0;
D
dapan1121 已提交
486 487 488
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
489
  size_t sz = 0;
D
dapan1121 已提交
490 491 492 493
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);  
  STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
D
dapan1121 已提交
494
  if (NULL == tbMeta) {
D
dapan 已提交
495
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
496
    
D
dapan1121 已提交
497
    *exist = 0;
D
dapan1121 已提交
498
    ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
499 500 501 502
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
503

D
dapan 已提交
504
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
505
  
D
dapan1121 已提交
506
  ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
507 508 509 510
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
511

D
dapan1121 已提交
512
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, int32_t *exist, int32_t flag) {
D
dapan1121 已提交
513
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
514
    *exist = 0;
D
dapan1121 已提交
515
    ctgWarn("empty tbmeta cache, tbName:%s", pTableName->tname);
D
dapan1121 已提交
516 517 518
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
519
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
520 521 522 523 524
  if (CTG_IS_INF_DB(flag)) {
    strcpy(dbFName, pTableName->dbname);
  } else {
    tNameGetFullDbName(pTableName, dbFName);
  }
D
dapan1121 已提交
525

D
dapan1121 已提交
526 527
  *pTableMeta = NULL;

D
dapan1121 已提交
528 529
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
530 531 532 533 534
  if (NULL == dbCache) {
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
535 536 537 538 539
  size_t sz = 0;  
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz);
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan1121 已提交
540
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
541
    *exist = 0;
D
dapan1121 已提交
542 543
    ctgReleaseDBCache(pCtg, dbCache);
    ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
544 545 546
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
547
  *exist = 1;
D
dapan1121 已提交
548 549
  
  tbMeta = *pTableMeta;
D
dapan1121 已提交
550

D
dapan1121 已提交
551
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
552
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
553
    ctgDebug("Got meta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, pTableName->tname);
D
dapan1121 已提交
554 555 556
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
557
  CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
558
  
D
dapan1121 已提交
559
  STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
D
dapan1121 已提交
560
  if (NULL == stbMeta || NULL == *stbMeta) {
D
dapan1121 已提交
561
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
562 563
    ctgReleaseDBCache(pCtg, dbCache);
    ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid);
D
dapan1121 已提交
564 565 566 567
    tfree(*pTableMeta);
    *exist = 0;
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
568

D
dapan1121 已提交
569
  if ((*stbMeta)->suid != tbMeta->suid) {    
D
dapan1121 已提交
570
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
571
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
572
    tfree(*pTableMeta);
D
dapan1121 已提交
573
    ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
574 575
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
576

D
dapan1121 已提交
577
  int32_t metaSize = CTG_META_SIZE(*stbMeta);
D
dapan1121 已提交
578 579
  *pTableMeta = realloc(*pTableMeta, metaSize);
  if (NULL == *pTableMeta) {    
D
dapan1121 已提交
580
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
581
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
582
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
583
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
584 585
  }

D
dapan1121 已提交
586 587
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

D
dapan1121 已提交
588
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
589

D
dapan1121 已提交
590
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan 已提交
591

D
dapan1121 已提交
592
  ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
593 594 595 596
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
597
int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_t *tbType, int32_t flag) {
D
dapan1121 已提交
598
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
599
    ctgWarn("empty db cache, tbName:%s", pTableName->tname);  
D
dapan1121 已提交
600 601 602
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
603
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
604 605 606 607 608 609
  if (CTG_IS_INF_DB(flag)) {
    strcpy(dbFName, pTableName->dbname);
  } else {
    tNameGetFullDbName(pTableName, dbFName);
  }
  
D
dapan 已提交
610 611
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
612 613 614
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
615

D
dapan1121 已提交
616 617 618
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname));

D
dapan1121 已提交
619
  if (NULL == pTableMeta) {
D
dapan1121 已提交
620
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan 已提交
621 622
    ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);  
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
623
    
D
dapan1121 已提交
624 625 626
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
627 628
  *tbType = atomic_load_8(&pTableMeta->tableType);

D
dapan1121 已提交
629 630 631 632
  taosHashRelease(dbCache->tbCache.metaCache, pTableMeta);

  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan 已提交
633
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
634

D
dapan 已提交
635
  ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, pTableName->tname, *tbType);  
D
dapan1121 已提交
636 637 638 639
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
640
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
D
dapan1121 已提交
641
  SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
D
dapan1121 已提交
642 643 644 645
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
646
  ctgDebug("try to get table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
647

D
dapan1121 已提交
648
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)](&bInput, &msg, 0, &msgLen);
D
dapan1121 已提交
649 650 651 652
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
653 654

  SRpcMsg rpcMsg = {
D
dapan1121 已提交
655
      .msgType = TDMT_MND_TABLE_META,
D
dapan1121 已提交
656 657 658
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
659

D
dapan1121 已提交
660 661
  SRpcMsg rpcRsp = {0};

D
dapan1121 已提交
662
  rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
663 664
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
665
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
666
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
667
      ctgDebug("stablemeta not exist in mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
668 669 670
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
671
    ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
D
dapan1121 已提交
672 673 674
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
675
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
D
dapan1121 已提交
676
  if (code) {
D
dapan1121 已提交
677
    ctgError("Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tbName);
D
dapan1121 已提交
678 679 680
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
681
  ctgDebug("Got table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
682 683 684 685

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
686
int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
D
dapan1121 已提交
687 688
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
689

D
dapan1121 已提交
690
  return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output);
D
dapan1121 已提交
691
}
D
dapan1121 已提交
692

D
dapan1121 已提交
693 694
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
695
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
696 697
  }

D
dapan1121 已提交
698 699
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
700

D
dapan1121 已提交
701
  ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
702

D
dapan1121 已提交
703
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
D
dapan1121 已提交
704 705 706
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
707 708
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
D
dapan1121 已提交
709
    ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
710 711
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
712 713

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
714
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
715 716 717 718 719
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
720
  rpcSendRecv(pTrans, &vgroupInfo->epset, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
721
  
D
dapan1121 已提交
722
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
723
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
724
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
725
      ctgDebug("tablemeta not exist in vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
726 727 728
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
729
    ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
730
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
731 732
  }

D
dapan1121 已提交
733 734
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
735
    ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
736 737 738
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
739
  ctgDebug("Got table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
740 741 742 743
  return TSDB_CODE_SUCCESS;
}


744 745
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
746 747 748 749 750 751 752 753
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
754
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
D
dapan1121 已提交
755
  SHashObj *vgroupHash = NULL;
756
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
757 758
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
759
  int32_t vgNum = taosHashGetSize(vgHash);
760

D
dapan1121 已提交
761
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
762
  if (NULL == vgList) {
D
dapan1121 已提交
763
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
764 765 766
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

D
dapan1121 已提交
767
  void *pIter = taosHashIterate(vgHash, NULL);
768 769
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
770

D
dapan1121 已提交
771
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
772
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
773
      taosHashCancelIterate(vgHash, pIter);      
D
dapan1121 已提交
774
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
775 776
    }
    
D
dapan1121 已提交
777
    pIter = taosHashIterate(vgHash, pIter);
778
    vgInfo = NULL;
D
dapan1121 已提交
779 780
  }

D
dapan1121 已提交
781
  *pList = vgList;
D
dapan1121 已提交
782

D
dapan1121 已提交
783
  ctgDebug("Got vgList from cache, vgNum:%d", vgNum);
D
dapan1121 已提交
784

D
dapan1121 已提交
785
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
786 787 788 789 790 791 792 793

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
794 795
}

D
dapan1121 已提交
796
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
797 798
  int32_t code = 0;
  
D
dapan1121 已提交
799
  int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
H
Haojun Liao 已提交
800 801 802
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

803
  if (vgNum <= 0) {
D
dapan1121 已提交
804
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
805
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
806 807
  }

808 809
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
810

D
dapan1121 已提交
811
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
812 813

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
814
  tNameExtractFullName(pTableName, tbFullName);
815 816 817

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

D
dapan1121 已提交
818
  void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
819 820 821
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
D
dapan1121 已提交
822
      taosHashCancelIterate(dbInfo->vgHash, pIter);
823
      break;
D
dapan1121 已提交
824
    }
825
    
D
dapan1121 已提交
826
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
827
    vgInfo = NULL;
D
dapan1121 已提交
828 829
  }

830
  if (NULL == vgInfo) {
D
dapan1121 已提交
831 832
    ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
833 834 835 836
  }

  *pVgroup = *vgInfo;

837
  CTG_RET(code);
D
dapan1121 已提交
838 839
}

D
dapan1121 已提交
840
int32_t ctgStbVersionCompare(const void* key1, const void* key2) {
D
dapan 已提交
841
  if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
842
    return -1;
D
dapan 已提交
843
  } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
844 845 846 847 848 849 850
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) {
D
dapan1121 已提交
851
  if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
852
    return -1;
D
dapan1121 已提交
853
  } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
854 855 856
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
857
  }
D
dapan1121 已提交
858 859
}

D
dapan1121 已提交
860
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
D
dapan1121 已提交
861 862 863 864
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

D
dapan1121 已提交
865
  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
D
dapan1121 已提交
866
  
D
dapan1121 已提交
867 868 869
  mgmt->slots = calloc(1, msgSize);
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan 已提交
870
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
871
  }
D
dapan1121 已提交
872

D
dapan1121 已提交
873 874 875 876
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
877

D
dapan1121 已提交
878

D
dapan1121 已提交
879
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
D
dapan1121 已提交
880
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
881

D
dapan1121 已提交
882
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
883 884 885 886 887 888 889 890
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
891
    }
D
dapan1121 已提交
892
  }
D
dapan1121 已提交
893

D
dapan1121 已提交
894 895 896
  if (NULL == taosArrayPush(slot->meta, meta)) {
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
897 898
  }

D
dapan1121 已提交
899
  slot->needSort = true;
D
dapan1121 已提交
900

D
dapan1121 已提交
901
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
902

D
dapan1121 已提交
903 904 905 906 907 908
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

D
dapan1121 已提交
909
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t compare) {
D
dapan1121 已提交
910
  int16_t widx = abs(id % mgmt->slotNum);
D
dapan1121 已提交
911

D
dapan1121 已提交
912
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
913 914 915 916
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
917 918
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
919 920 921
  }

  if (slot->needSort) {
D
dapan1121 已提交
922
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
923 924
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
D
dapan1121 已提交
925
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
926 927 928
  }

  void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ);
D
dapan1121 已提交
929
  if (NULL == orig) {
D
dapan1121 已提交
930
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

D
dapan1121 已提交
950
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t compare) {
D
dapan1121 已提交
951 952
  int16_t widx = abs(id % mgmt->slotNum);

D
dapan1121 已提交
953
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
    taosArraySort(slot->meta, compare);
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

  int32_t idx = taosArraySearchIdx(slot->meta, &id, compare, TD_EQ);
  if (idx < 0) {
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

  qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}


D
dapan1121 已提交
986
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
987 988 989 990
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
991
  }
D
dapan1121 已提交
992

D
dapan1121 已提交
993
  SCtgRentSlot *slot = &mgmt->slots[ridx];
D
dapan1121 已提交
994
  int32_t code = 0;
D
dapan1121 已提交
995
  
D
dapan1121 已提交
996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
  *res = malloc(msize);
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

D
dapan1121 已提交
1032
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
1052 1053 1054
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1055
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
D
dapan1121 已提交
1056
  int32_t code = 0;
D
dapan1121 已提交
1057

D
dapan1121 已提交
1058 1059 1060
  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

D
dapan 已提交
1061
  newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1062
  if (NULL == newDBCache.tbCache.metaCache) {
D
dapan 已提交
1063
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1064 1065 1066
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

D
dapan 已提交
1067
  newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1068
  if (NULL == newDBCache.tbCache.stbCache) {
D
dapan 已提交
1069
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1070 1071 1072 1073
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
D
dapan1121 已提交
1074 1075 1076
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1077
      goto _return;
D
dapan1121 已提交
1078 1079 1080
    }
    
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1081 1082 1083
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
1084
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
D
dapan1121 已提交
1085 1086
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));

D
dapan1121 已提交
1087
  ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1088

D
dapan1121 已提交
1089 1090 1091 1092
  if (CTG_IS_INF_DBNAME(dbFName)) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1093 1094 1095
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));

  ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
D
dapan1121 已提交
1096

D
dapan1121 已提交
1097
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1098

D
dapan1121 已提交
1099
_return:
D
dapan1121 已提交
1100

D
dapan1121 已提交
1101
  ctgFreeDbCache(&newDBCache);
D
dapan1121 已提交
1102

D
dapan1121 已提交
1103 1104
  CTG_RET(code);
}
D
dapan1121 已提交
1105

D
dapan1121 已提交
1106

D
dapan1121 已提交
1107
void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
D
dapan1121 已提交
1108 1109 1110 1111 1112 1113 1114
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    void *pIter = taosHashIterate(cache->stbCache, NULL);
    while (pIter) {
      uint64_t *suid = NULL;
      taosHashGetKey(pIter, (void **)&suid, NULL);

D
dapan1121 已提交
1115
      if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionCompare)) {
D
dapan1121 已提交
1116 1117 1118 1119 1120 1121 1122 1123 1124 1125
        ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
      }
          
      pIter = taosHashIterate(cache->stbCache, pIter);
    }
  }
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
}


D
dapan1121 已提交
1126
int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
D
dapan 已提交
1127 1128 1129 1130
  uint64_t dbId = dbCache->dbId;
  
  ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

D
dapan1121 已提交
1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143
  atomic_store_8(&dbCache->deleted, 1);

  ctgRemoveStbRent(pCtg, &dbCache->tbCache);

  ctgFreeDbCache(dbCache);

  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
  
  ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
D
dapan1121 已提交
1144 1145
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1146
  }
D
dapan 已提交
1147 1148

  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1149 1150 1151
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1152 1153


D
dapan1121 已提交
1154
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
D
dapan1121 已提交
1155 1156
  int32_t code = 0;
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1157
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1158
  
D
dapan1121 已提交
1159 1160
  if (dbCache) {
  // TODO OPEN IT
D
dapan1121 已提交
1161
#if 0    
D
dapan1121 已提交
1162 1163 1164 1165
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1166
#else
D
dapan1121 已提交
1167 1168 1169
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1170 1171
    }

D
dapan1121 已提交
1172 1173 1174 1175 1176
    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1177
    
D
dapan1121 已提交
1178 1179 1180 1181 1182 1183
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDB(pCtg, dbCache, dbFName));
D
dapan1121 已提交
1184
  }
D
dapan1121 已提交
1185 1186
  
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));
D
dapan1121 已提交
1187

D
dapan1121 已提交
1188
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1189

D
dapan1121 已提交
1190
  *pCache = dbCache;
D
dapan1121 已提交
1191

D
dapan1121 已提交
1192
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1193 1194 1195
}


D
dapan1121 已提交
1196 1197 1198 1199 1200 1201
int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) {
  int32_t code = 0;
  SDBVgInfo* dbInfo = *pDbInfo;
  
  if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
D
dapan1121 已提交
1202
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1203 1204
  }

D
dapan1121 已提交
1205
  bool newAdded = false;
D
dapan1121 已提交
1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216
  SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion};

  SCtgDBCache *dbCache = NULL;
  CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
D
dapan1121 已提交
1217
  
D
dapan1121 已提交
1218 1219 1220 1221
  if (dbCache->vgInfo) {
    if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
      ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
      ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1222
      
D
dapan1121 已提交
1223
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1224
    }
D
dapan1121 已提交
1225 1226

    ctgFreeVgInfo(dbCache->vgInfo);
D
dapan1121 已提交
1227 1228
  }

D
dapan1121 已提交
1229
  dbCache->vgInfo = dbInfo;
D
dapan1121 已提交
1230

D
dapan1121 已提交
1231
  *pDbInfo = NULL;
D
dapan1121 已提交
1232

D
dapan1121 已提交
1233
  ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
D
dapan1121 已提交
1234

D
dapan1121 已提交
1235
  ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1236

D
dapan1121 已提交
1237
  dbCache = NULL;
D
dapan1121 已提交
1238

D
dapan1121 已提交
1239 1240 1241
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
  
D
dapan1121 已提交
1242 1243 1244 1245
  CTG_RET(code);
}


D
dapan1121 已提交
1246 1247
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
  SCtgTbMetaCache *tbCache = &dbCache->tbCache;
D
dapan1121 已提交
1248

D
dapan1121 已提交
1249 1250 1251 1252 1253
  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1254 1255
  }

D
dapan1121 已提交
1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269
  int8_t origType = 0;
  uint64_t origSuid = 0;
  bool isStb = meta->tableType == TSDB_SUPER_TABLE;
  STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (orig) {
    origType = orig->tableType;
    origSuid = orig->suid;
    
    if (origType == TSDB_SUPER_TABLE && ((!isStb) || origSuid != meta->suid)) {
      CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
      if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
        ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      }
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1270

D
dapan1121 已提交
1271 1272
      ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
      
D
dapan1121 已提交
1273
      ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionCompare);
D
dapan1121 已提交
1274
    }
D
dapan1121 已提交
1275
  }
D
dapan1121 已提交
1276

D
dapan1121 已提交
1277 1278
  if (isStb) {
    CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1279
  }
D
dapan1121 已提交
1280
  
D
dapan1121 已提交
1281 1282 1283 1284 1285 1286 1287 1288 1289
  if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
    if (isStb) {
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    }
    
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1290

D
dapan 已提交
1291 1292
  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);

D
dapan1121 已提交
1293 1294 1295
  if (!isStb) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1296
  }
D
dapan1121 已提交
1297

D
dapan1121 已提交
1298 1299 1300 1301 1302
  if (isStb && origSuid == meta->suid) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1303

D
dapan1121 已提交
1304 1305 1306 1307 1308 1309
  STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1310
  }
D
dapan1121 已提交
1311 1312
  
  CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1313

D
dapan1121 已提交
1314 1315
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);

D
dapan 已提交
1316
  ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1317 1318 1319 1320 1321 1322 1323

  SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
  strcpy(metaRent.dbFName, dbFName);
  strcpy(metaRent.stbName, tbName);
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1324 1325
}

D
dapan 已提交
1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
  *dst = malloc(sizeof(SDBVgInfo));
  if (NULL == *dst) {
    qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  memcpy(*dst, src, sizeof(SDBVgInfo));

  size_t hashSize = taosHashGetSize(src->vgHash);
  (*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == (*dst)->vgHash) {
    qError("taosHashInit %d failed", (int32_t)hashSize);
    tfree(*dst);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  int32_t *vgId = NULL;
  void *pIter = taosHashIterate(src->vgHash, NULL);
  while (pIter) {
    taosHashGetKey(pIter, (void **)&vgId, NULL);

    if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
      qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
      taosHashCancelIterate(src->vgHash, pIter);
      taosHashCleanup((*dst)->vgHash);
      tfree(*dst);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(src->vgHash, pIter);
  }


  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
1365
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
D
dapan1121 已提交
1366
  bool inCache = false;
D
dapan1121 已提交
1367
  int32_t code = 0;
1368
  if (!forceUpdate) {
D
dapan1121 已提交
1369
    CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
D
dapan1121 已提交
1370
    if (inCache) {
D
dapan1121 已提交
1371 1372 1373 1374 1375 1376 1377
      return TSDB_CODE_SUCCESS;
    }
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

D
dapan1121 已提交
1378
  tstrncpy(input.db, dbFName, tListLen(input.db));
D
dapan1121 已提交
1379
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
1380

D
dapan 已提交
1381
  CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
D
dapan1121 已提交
1382

D
dapan 已提交
1383 1384 1385 1386 1387 1388 1389 1390
  CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
  
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
  SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
    ctgFreeVgInfo(DbOut.dbVgroup);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1391 1392
  }

D
dapan 已提交
1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->pCtg = pCtg;
  msg->dbId = DbOut.dbId;
  msg->dbInfo = DbOut.dbVgroup;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
1404
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
1405 1406 1407 1408 1409 1410 1411 1412 1413

_return:

  tfree(*pInfo);
  tfree(msg);
  
  *pInfo = DbOut.dbVgroup;
  
  CTG_RET(code);
D
dapan1121 已提交
1414 1415
}

D
dapan 已提交
1416

D
dapan1121 已提交
1417 1418 1419 1420 1421
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
  *pOutput = malloc(sizeof(STableMetaOutput));
  if (NULL == *pOutput) {
    qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
1422 1423
  }

D
dapan1121 已提交
1424 1425 1426 1427 1428 1429
  memcpy(*pOutput, output, sizeof(STableMetaOutput));

  if (output->tbMeta) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    (*pOutput)->tbMeta = malloc(metaSize);
    if (NULL == (*pOutput)->tbMeta) {
D
dapan 已提交
1430
      qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
D
dapan1121 已提交
1431 1432 1433 1434 1435
      tfree(*pOutput);
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
D
dapan 已提交
1436 1437
  }

D
dapan1121 已提交
1438 1439 1440
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
1441 1442


D
dapan1121 已提交
1443 1444
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput) {
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1445 1446 1447 1448 1449 1450
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

D
dapan1121 已提交
1451 1452 1453
  if (!CTG_IS_INF_DB(flag)) {
    CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo));
  }
D
dapan1121 已提交
1454

D
dapan 已提交
1455
  SCtgUpdateTblMsg *msg = NULL;
D
dapan1121 已提交
1456
  STableMetaOutput  moutput = {0};
S
Shengliang Guan 已提交
1457
  STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
D
dapan1121 已提交
1458 1459 1460 1461
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1462 1463 1464 1465 1466 1467

  if (CTG_IS_INF_DB(flag)) {
    ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName));

    CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output));
  } else if (CTG_IS_STB(flag)) {
D
dapan 已提交
1468
    ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1469 1470

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
1471
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTrans, pMgmtEps, pTableName, output));
D
dapan1121 已提交
1472

D
dapan1121 已提交
1473
    if (CTG_IS_META_NULL(output->metaType)) {
D
dapan1121 已提交
1474
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1475 1476
    }
  } else {
D
dapan1121 已提交
1477
    ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pTableName), flag);
D
dapan1121 已提交
1478 1479

    // if get from vnode failed or no table meta, will not try mnode
D
dapan1121 已提交
1480
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1481

D
dapan1121 已提交
1482
    if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan 已提交
1483
      ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType);
D
dapan1121 已提交
1484

D
dapan1121 已提交
1485
      tfree(output->tbMeta);
D
dapan1121 已提交
1486
      
D
dapan1121 已提交
1487
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, output));
D
dapan1121 已提交
1488
    } else if (CTG_IS_META_BOTH(output->metaType)) {
D
dapan1121 已提交
1489
      int32_t exist = 0;
D
dapan1121 已提交
1490
      CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
D
dapan1121 已提交
1491
      if (0 == exist) {
D
dapan1121 已提交
1492
        CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, &moutput));
D
dapan1121 已提交
1493

D
dapan1121 已提交
1494
        if (CTG_IS_META_NULL(moutput.metaType)) {
D
dapan1121 已提交
1495
          SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
1496 1497
        }
        
D
dapan1121 已提交
1498 1499
        tfree(output->tbMeta);
        output->tbMeta = moutput.tbMeta;
D
dapan1121 已提交
1500 1501
        moutput.tbMeta = NULL;
      } else {
D
dapan1121 已提交
1502
        tfree(output->tbMeta);
D
dapan1121 已提交
1503
        
D
dapan1121 已提交
1504
        SET_META_TYPE_CTABLE(output->metaType); 
D
dapan1121 已提交
1505
      }
D
dapan1121 已提交
1506 1507 1508
    }
  }

D
dapan1121 已提交
1509
  if (CTG_IS_META_NULL(output->metaType)) {
D
dapan 已提交
1510
    ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1511 1512 1513
    CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
  }

D
dapan 已提交
1514 1515 1516 1517 1518 1519
  if (CTG_IS_META_TABLE(output->metaType)) {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
  } else {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
  }

D
dapan1121 已提交
1520 1521 1522 1523 1524
  if (pOutput) {
    CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
  }

  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
D
dapan 已提交
1525
  msg = malloc(sizeof(SCtgUpdateTblMsg));
D
dapan1121 已提交
1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536 1537
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
1538 1539
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
1540
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1541 1542 1543

_return:

D
dapan1121 已提交
1544
  tfree(output->tbMeta);
D
dapan 已提交
1545
  tfree(output);
D
dapan1121 已提交
1546
  tfree(msg);
D
dapan1121 已提交
1547 1548 1549 1550
  
  CTG_RET(code);
}

D
dapan1121 已提交
1551
int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, bool forceUpdate, STableMeta** pTableMeta, int32_t flag) {
D
dapan1121 已提交
1552
  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
1553 1554 1555 1556
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
  int32_t exist = 0;
D
dapan1121 已提交
1557
  int32_t code = 0;
D
dapan1121 已提交
1558

D
dapan1121 已提交
1559 1560 1561 1562 1563 1564
  if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
    CTG_SET_INF_DB(flag);
  }

  if ((!forceUpdate) || (CTG_IS_INF_DB(flag))) {
    CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &exist, flag));
D
dapan1121 已提交
1565

D
dapan1121 已提交
1566
    if (exist && CTG_TBTYPE_MATCH(flag, (*pTableMeta)->tableType)) {
D
dapan1121 已提交
1567 1568
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1569 1570

    tfree(*pTableMeta);
D
dapan1121 已提交
1571
  } else if (CTG_IS_UNKNOWN_STB(flag)) {
D
dapan1121 已提交
1572 1573
    int32_t tbType = 0;
    
D
dapan1121 已提交
1574
    CTG_ERR_RET(ctgGetTableTypeFromCache(pCtg, pTableName, &tbType, flag));
D
dapan1121 已提交
1575

D
dapan1121 已提交
1576
    CTG_SET_STB(flag, tbType);
D
dapan1121 已提交
1577 1578
  }

D
dapan1121 已提交
1579 1580
  STableMetaOutput *output = NULL;

D
dapan 已提交
1581
  while (true) {
D
dapan1121 已提交
1582
    CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output));
D
dapan1121 已提交
1583

D
dapan 已提交
1584 1585 1586 1587
    if (CTG_IS_META_TABLE(output->metaType)) {
      *pTableMeta = output->tbMeta;
      goto _return;
    }
D
dapan1121 已提交
1588

D
dapan 已提交
1589 1590 1591 1592 1593 1594
    if (CTG_IS_META_BOTH(output->metaType)) {
      memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
      
      *pTableMeta = output->tbMeta;
      goto _return;
    }
D
dapan1121 已提交
1595

D
dapan 已提交
1596 1597 1598 1599 1600
    if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) {
      ctgError("invalid metaType:%d", output->metaType);
      tfree(output->tbMeta);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1601

D
dapan 已提交
1602
    // HANDLE ONLY CHILD TABLE META
D
dapan1121 已提交
1603

D
dapan 已提交
1604 1605 1606
    SName stbName = *pTableName;
    strcpy(stbName.tname, output->tbName);
    
D
dapan1121 已提交
1607
    CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &exist, flag));
D
dapan 已提交
1608 1609 1610 1611
    if (0 == exist) {
      ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
      continue;
    }
D
dapan1121 已提交
1612

D
dapan 已提交
1613 1614 1615 1616
    memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));

    break;
  }
D
dapan1121 已提交
1617 1618 1619 1620 1621

_return:

  tfree(output);

D
dapan 已提交
1622 1623 1624 1625
  if (*pTableMeta) {
    ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType);
  }

D
dapan1121 已提交
1626 1627 1628 1629 1630 1631 1632 1633 1634 1635 1636 1637 1638 1639 1640 1641 1642 1643 1644 1645 1646 1647 1648 1649 1650
  CTG_RET(code);
}



int32_t ctgActUpdateVg(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateVgMsg *msg = action->data;
  
  CTG_ERR_JRET(ctgUpdateDBVgInfo(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));

_return:

  tfree(msg->dbInfo);
  tfree(msg);
  
  CTG_RET(code);
}

int32_t ctgActRemoveDB(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveDBMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
D
dapan 已提交
1651
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
D
dapan1121 已提交
1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666 1667 1668 1669 1670 1671 1672 1673 1674 1675 1676 1677 1678 1679
  if (NULL == dbCache) {
    goto _return;
  }
  
  if (dbCache->dbId != msg->dbId) {
    ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
    goto _return;
  }
  
  CTG_ERR_JRET(ctgRemoveDB(pCtg, dbCache, msg->dbFName));

_return:

  tfree(msg);
  
  CTG_RET(code);
}


int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateTblMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;
  STableMetaOutput* output = msg->output;
  SCtgDBCache *dbCache = NULL;

  if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
D
dapan 已提交
1680
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1681 1682 1683 1684 1685 1686 1687
  }

  if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }    

D
dapan1121 已提交
1688 1689 1690 1691 1692
  char *p = strchr(output->dbFName, '.');
  if (p && CTG_IS_INF_DBNAME(p + 1)) {
    memmove(output->dbFName, p + 1, strlen(p + 1));
  }
  
D
dapan1121 已提交
1693 1694 1695 1696 1697 1698 1699 1700 1701 1702 1703 1704 1705 1706 1707 1708 1709 1710
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
  }

  if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
  }

_return:

D
dapan 已提交
1711 1712 1713
  if (output) {
    tfree(output->tbMeta);
    tfree(output);
D
dapan1121 已提交
1714
  }
D
dapan 已提交
1715
  
D
dapan1121 已提交
1716 1717 1718 1719 1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733 1734 1735 1736 1737 1738 1739 1740 1741 1742 1743 1744 1745 1746 1747 1748 1749 1750
  tfree(msg);
  
  CTG_RET(code);
}


int32_t ctgActRemoveStb(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveStbMsg *msg = action->data;
  bool removed = false;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->dbId != msg->dbId) {
    ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }

  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {  
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
    CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1751
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1752 1753 1754 1755 1756 1757 1758
  }  
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
  CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  
  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1759
  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionCompare));
D
dapan1121 已提交
1760 1761 1762 1763 1764 1765 1766 1767 1768 1769 1770 1771 1772 1773 1774 1775 1776 1777 1778 1779 1780
  
  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
  
_return:

  tfree(msg);
  
  CTG_RET(code);
}

int32_t ctgActRemoveTbl(SCtgMetaAction *action) {

}



void* ctgUpdateThreadFunc(void* param) {
  setThreadName("catalog");

  qInfo("catalog update thread started");

D
dapan 已提交
1781
  CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
1782 1783
  
  while (true) {
D
dapan 已提交
1784
    tsem_wait(&gCtgMgmt.sem);
D
dapan1121 已提交
1785
    
D
dapan 已提交
1786
    if (atomic_load_8(&gCtgMgmt.exit)) {
D
dapan1121 已提交
1787 1788 1789 1790 1791 1792
      break;
    }

    SCtgMetaAction *action = NULL;
    ctgPopAction(&action);

D
dapan 已提交
1793 1794 1795 1796 1797
    qDebug("process %s action", gCtgAction[action->act].name);
    
    (*gCtgAction[action->act].func)(action);

    CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum); 
D
dapan1121 已提交
1798 1799
  }

D
dapan 已提交
1800
  CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
1801 1802 1803 1804 1805 1806 1807 1808 1809 1810 1811 1812

  qInfo("catalog update thread stopped");
  
  return NULL;
}


int32_t ctgStartUpdateThread() {
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

D
dapan 已提交
1813
  if (pthread_create(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
D
dapan1121 已提交
1814 1815
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
D
dapan1121 已提交
1816 1817
  }
  
D
dapan1121 已提交
1818
  pthread_attr_destroy(&thAttr);
D
dapan1121 已提交
1819 1820 1821 1822
  return TSDB_CODE_SUCCESS;
}


D
dapan1121 已提交
1823
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan 已提交
1824
  if (gCtgMgmt.pCluster) {
D
dapan 已提交
1825
    qError("catalog already initialized");
D
dapan1121 已提交
1826
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1827 1828
  }

D
dapan 已提交
1829
  atomic_store_8(&gCtgMgmt.exit, false);
D
dapan1121 已提交
1830

D
dapan1121 已提交
1831
  if (cfg) {
D
dapan 已提交
1832
    memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
1833

D
dapan 已提交
1834 1835
    if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
      gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
D
dapan1121 已提交
1836 1837
    }

D
dapan 已提交
1838 1839
    if (gCtgMgmt.cfg.maxTblCacheNum == 0) {
      gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
D
dapan1121 已提交
1840
    }
D
dapan1121 已提交
1841

D
dapan 已提交
1842 1843
    if (gCtgMgmt.cfg.dbRentSec == 0) {
      gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1844 1845
    }

D
dapan 已提交
1846 1847
    if (gCtgMgmt.cfg.stbRentSec == 0) {
      gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
1848
    }
D
dapan1121 已提交
1849
  } else {
D
dapan 已提交
1850 1851 1852 1853
    gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
    gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
1854 1855
  }

D
dapan 已提交
1856 1857
  gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == gCtgMgmt.pCluster) {
D
dapan1121 已提交
1858 1859
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1860 1861
  }

D
dapan1121 已提交
1862 1863
  CTG_ERR_RET(ctgStartUpdateThread());

D
dapan 已提交
1864
  tsem_init(&gCtgMgmt.sem, 0, 0);
D
dapan1121 已提交
1865

D
dapan 已提交
1866 1867
  gCtgMgmt.head = calloc(1, sizeof(SCtgQNode));
  if (NULL == gCtgMgmt.head) {
D
dapan1121 已提交
1868 1869 1870
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan 已提交
1871
  gCtgMgmt.tail = gCtgMgmt.head;
D
dapan1121 已提交
1872

D
dapan 已提交
1873
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
D
dapan1121 已提交
1874

D
dapan 已提交
1875
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1876 1877
}

D
dapan1121 已提交
1878
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
1879
  if (NULL == catalogHandle) {
D
dapan1121 已提交
1880
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
1881 1882
  }

D
dapan 已提交
1883
  if (NULL == gCtgMgmt.pCluster) {
D
dapan 已提交
1884
    qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
1885
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
1886 1887
  }

D
dapan1121 已提交
1888 1889
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
1890

D
dapan1121 已提交
1891
  while (true) {
D
dapan 已提交
1892
    SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
1893

D
dapan1121 已提交
1894 1895 1896 1897 1898
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
1899

D
dapan1121 已提交
1900 1901 1902 1903 1904 1905
    clusterCtg = calloc(1, sizeof(SCatalog));
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan1121 已提交
1906 1907
    clusterCtg->clusterId = clusterId;

D
dapan 已提交
1908 1909
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
D
dapan1121 已提交
1910

D
dapan 已提交
1911
    clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1912 1913 1914 1915 1916
    if (NULL == clusterCtg->dbCache) {
      qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan 已提交
1917
    SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1918
    if (NULL == metaCache) {
D
dapan 已提交
1919
      qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1920 1921 1922
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
D
dapan 已提交
1923
    code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
D
dapan1121 已提交
1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934 1935 1936
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
1937
  }
D
dapan1121 已提交
1938 1939

  *catalogHandle = clusterCtg;
D
dapan 已提交
1940
  
D
dapan1121 已提交
1941
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1942 1943 1944 1945 1946 1947 1948 1949

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

D
dapan1121 已提交
1950 1951
void catalogFreeHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
1952 1953
    return;
  }
D
dapan1121 已提交
1954

D
dapan 已提交
1955
  if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
D
dapan1121 已提交
1956
    ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
D
dapan1121 已提交
1957 1958 1959
    return;
  }

D
dapan1121 已提交
1960
  uint64_t clusterId = pCtg->clusterId;
D
dapan1121 已提交
1961
  
D
dapan1121 已提交
1962
  ctgFreeHandle(pCtg);
D
dapan1121 已提交
1963 1964

  ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
D
dapan 已提交
1965 1966
}

D
dapan1121 已提交
1967
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) {
D
dapan1121 已提交
1968 1969
  CTG_API_ENTER();

D
dapan1121 已提交
1970 1971 1972 1973 1974
  if (NULL == pCtg || NULL == dbFName || NULL == version) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
1975
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1976
    ctgInfo("empty db cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1977
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1978 1979
  }

D
dapan1121 已提交
1980 1981 1982
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {
D
dapan1121 已提交
1983
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1984
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1985 1986
  }

D
dapan 已提交
1987 1988 1989
  bool inCache = false;
  ctgAcquireVgInfo(pCtg, dbCache, &inCache);
  if (!inCache) {
D
dapan1121 已提交
1990
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
1991 1992

    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1993
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
1994 1995
  }

D
dapan1121 已提交
1996 1997 1998 1999
  *version = dbCache->vgInfo->vgVersion;

  ctgReleaseVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
2000

D
dapan1121 已提交
2001
  ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
D
dapan1121 已提交
2002

D
dapan1121 已提交
2003
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2004 2005
}

D
dapan1121 已提交
2006
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
D
dapan1121 已提交
2007 2008
  CTG_API_ENTER();

D
dapan1121 已提交
2009 2010 2011
  if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
2012

D
dapan1121 已提交
2013
  SCtgDBCache* dbCache = NULL;
2014
  int32_t code = 0;
D
dapan1121 已提交
2015
  SArray *vgList = NULL;
D
dapan1121 已提交
2016 2017 2018 2019 2020 2021 2022
  SHashObj *vgHash = NULL;
  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache, &vgInfo));
  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
D
dapan1121 已提交
2023 2024
  }

D
dapan1121 已提交
2025
  CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
D
dapan1121 已提交
2026 2027 2028 2029 2030

  *vgroupList = vgList;
  vgList = NULL;

_return:
D
dapan1121 已提交
2031 2032

  if (dbCache) {
D
dapan1121 已提交
2033 2034
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
2035 2036
  }

D
dapan1121 已提交
2037 2038 2039
  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2040 2041
  }

D
dapan1121 已提交
2042
  CTG_API_LEAVE(code);  
D
dapan1121 已提交
2043 2044 2045
}


D
dapan1121 已提交
2046
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
D
dapan1121 已提交
2047
  CTG_API_ENTER();
D
dapan1121 已提交
2048 2049

  int32_t code = 0;
D
dapan1121 已提交
2050
  
D
dapan1121 已提交
2051
  if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
D
dapan1121 已提交
2052 2053 2054
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2055 2056 2057 2058
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
  SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
2059
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2060 2061
  }

D
dapan1121 已提交
2062 2063 2064 2065
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;
D
dapan1121 已提交
2066
  dbInfo = NULL;
D
dapan1121 已提交
2067

D
dapan1121 已提交
2068
  action.data = msg;
D
dapan1121 已提交
2069

D
dapan1121 已提交
2070
  CTG_ERR_JRET(ctgPushAction(&action));
D
dapan1121 已提交
2071

D
dapan 已提交
2072 2073
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2074 2075
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2076 2077
_return:

D
dapan1121 已提交
2078 2079 2080
  if (dbInfo) {
    taosHashCleanup(dbInfo->vgHash);
    tfree(dbInfo);
D
dapan1121 已提交
2081
  }
D
dapan1121 已提交
2082 2083

  tfree(msg);
D
dapan1121 已提交
2084
  
D
dapan1121 已提交
2085
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2086 2087 2088
}


D
dapan1121 已提交
2089 2090 2091
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
  CTG_API_ENTER();

D
dapan1121 已提交
2092 2093
  int32_t code = 0;
  
D
dapan1121 已提交
2094 2095
  if (NULL == pCtg || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2096 2097
  }

D
dapan1121 已提交
2098
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2099
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2100
  }
D
dapan1121 已提交
2101

D
dapan1121 已提交
2102 2103 2104 2105 2106
  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
  SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2107
  }
D
dapan1121 已提交
2108

D
dapan1121 已提交
2109 2110 2111
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;
D
dapan1121 已提交
2112

D
dapan1121 已提交
2113 2114 2115 2116
  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2117 2118
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2119
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2120
  
D
dapan1121 已提交
2121 2122 2123
_return:

  tfree(action.data);
D
dapan1121 已提交
2124

D
dapan1121 已提交
2125
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2126 2127
}

D
dapan1121 已提交
2128 2129 2130
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
  CTG_API_ENTER();

D
dapan 已提交
2131 2132
  int32_t code = 0;
  
D
dapan1121 已提交
2133 2134
  if (NULL == pCtg || NULL == dbFName || NULL == stbName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
2135 2136
  }

D
dapan1121 已提交
2137
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2138
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2139
  }
D
dapan1121 已提交
2140 2141 2142 2143 2144 2145

  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB};
  SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
2146 2147
  }

D
dapan1121 已提交
2148 2149 2150 2151 2152
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  strncpy(msg->stbName, stbName, sizeof(msg->stbName));
  msg->dbId = dbId;
  msg->suid = suid;
D
dapan1121 已提交
2153

D
dapan1121 已提交
2154 2155 2156 2157
  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2158 2159
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2160
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2161
  
D
dapan1121 已提交
2162 2163 2164 2165
_return:

  tfree(action.data);

D
dapan1121 已提交
2166
  CTG_API_LEAVE(code);
D
dapan 已提交
2167 2168
}

D
dapan1121 已提交
2169

D
dapan1121 已提交
2170
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2171 2172
  CTG_API_ENTER();

D
dapan1121 已提交
2173
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, false, pTableMeta, CTG_FLAG_UNKNOWN_STB));
D
dapan1121 已提交
2174
}
D
dapan1121 已提交
2175

D
dapan1121 已提交
2176
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2177 2178
  CTG_API_ENTER();

D
dapan1121 已提交
2179
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, false, pTableMeta, CTG_FLAG_STB));
D
dapan1121 已提交
2180 2181
}

D
dapan1121 已提交
2182 2183 2184 2185 2186 2187 2188 2189 2190 2191 2192 2193 2194
int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == rspMsg) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
2195 2196
  int32_t code = 0;

D
dapan1121 已提交
2197 2198
  strcpy(output->dbFName, rspMsg->dbFName);
  strcpy(output->tbName, rspMsg->tbName);
D
dapan1121 已提交
2199

D
dapan1121 已提交
2200
  output->dbId = rspMsg->dbId;
D
dapan1121 已提交
2201
  
D
dapan1121 已提交
2202
  SET_META_TYPE_TABLE(output->metaType);
D
dapan1121 已提交
2203
  
D
dapan1121 已提交
2204
  CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
D
dapan1121 已提交
2205

D
dapan1121 已提交
2206 2207 2208 2209 2210 2211
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
  SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
2212

D
dapan1121 已提交
2213 2214 2215 2216 2217 2218 2219
  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(&action));

D
dapan 已提交
2220 2221
  ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);

D
dapan1121 已提交
2222 2223
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2224 2225
_return:

D
dapan1121 已提交
2226 2227 2228
  tfree(output->tbMeta);
  tfree(output);
  tfree(msg);
D
dapan1121 已提交
2229
  
D
dapan1121 已提交
2230
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2231 2232 2233
}


D
dapan1121 已提交
2234
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
D
dapan1121 已提交
2235 2236
  CTG_API_ENTER();

D
dapan1121 已提交
2237
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
2238 2239 2240
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2241
  CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_GEN_STB_FLAG(isSTable), NULL));
2242
}
2243

D
dapan1121 已提交
2244
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
2245 2246
  CTG_API_ENTER();

D
dapan1121 已提交
2247
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, true, pTableMeta, CTG_GEN_STB_FLAG(isSTable)));
D
dapan1121 已提交
2248 2249
}

D
dapan1121 已提交
2250
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
D
dapan1121 已提交
2251
  CTG_API_ENTER();
D
dapan1121 已提交
2252 2253 2254 2255

  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2256 2257 2258 2259 2260

  if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2261 2262 2263 2264
  
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
D
dapan1121 已提交
2265
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
2266
  SArray *vgList = NULL;
D
dapan 已提交
2267
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
2268

D
dapan1121 已提交
2269
  *pVgList = NULL;
D
dapan1121 已提交
2270
  
D
dapan1121 已提交
2271
  CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, false, &tbMeta, CTG_FLAG_UNKNOWN_STB));
D
dapan1121 已提交
2272

H
Haojun Liao 已提交
2273 2274
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan 已提交
2275

D
dapan1121 已提交
2276 2277 2278 2279 2280 2281 2282 2283 2284 2285
  SHashObj *vgHash = NULL;  
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));

  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
  }

  /* TODO REMOEV THIS ....
D
dapan 已提交
2286 2287 2288
  if (0 == tbMeta->vgId) {
    SVgroupInfo vgroup = {0};
    
D
dapan1121 已提交
2289
    catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup);
D
dapan 已提交
2290 2291 2292

    tbMeta->vgId = vgroup.vgId;
  }
D
dapan1121 已提交
2293
  // TODO REMOVE THIS ....*/
D
dapan 已提交
2294

2295
  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
D
dapan1121 已提交
2296
    CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
D
dapan1121 已提交
2297
  } else {
2298
    int32_t vgId = tbMeta->vgId;
D
dapan1121 已提交
2299
    if (NULL == taosHashGetClone(vgHash, &vgId, sizeof(vgId), &vgroupInfo)) {
2300
      ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan 已提交
2301
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);    
2302
    }
D
dapan1121 已提交
2303

D
dapan1121 已提交
2304 2305
    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
D
dapan1121 已提交
2306
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
D
dapan 已提交
2307 2308 2309
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

D
dapan1121 已提交
2310
    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
2311
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
D
dapan1121 已提交
2312 2313
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan 已提交
2314

D
dapan1121 已提交
2315
    *pVgList = vgList;
D
dapan1121 已提交
2316 2317
    vgList = NULL;
  }
D
dapan 已提交
2318

D
dapan1121 已提交
2319
_return:
D
dapan 已提交
2320

D
dapan1121 已提交
2321
  if (dbCache) {
D
dapan1121 已提交
2322 2323 2324 2325 2326 2327 2328 2329 2330
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  tfree(tbMeta);

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2331 2332 2333 2334 2335 2336
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }
D
dapan1121 已提交
2337
  
D
dapan1121 已提交
2338
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2339 2340 2341
}


D
dapan1121 已提交
2342
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
2343 2344
  CTG_API_ENTER();

D
dapan1121 已提交
2345 2346 2347 2348 2349
  if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2350 2351
  SCtgDBCache* dbCache = NULL;
  int32_t code = 0;
H
Haojun Liao 已提交
2352 2353
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
2354

D
dapan1121 已提交
2355
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
2356
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, false, &dbCache, &vgInfo));
D
dapan1121 已提交
2357

D
dapan1121 已提交
2358
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
D
dapan1121 已提交
2359

D
dapan1121 已提交
2360
_return:
D
dapan1121 已提交
2361

D
dapan1121 已提交
2362
  if (dbCache) {
D
dapan1121 已提交
2363 2364 2365 2366 2367 2368 2369
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
    tfree(vgInfo);
D
dapan1121 已提交
2370
  }
D
dapan1121 已提交
2371

D
dapan1121 已提交
2372
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2373 2374 2375
}


D
dapan1121 已提交
2376
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
D
dapan1121 已提交
2377 2378
  CTG_API_ENTER();

D
dapan1121 已提交
2379
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
2380 2381 2382
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2383
  int32_t code = 0;
D
dapan1121 已提交
2384
  pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2385 2386 2387

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
2388
    if (tbNum <= 0) {
D
dapan1121 已提交
2389
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
2390
      CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2391
    }
H
Haojun Liao 已提交
2392

D
dapan1121 已提交
2393 2394
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
2395
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
2396
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2397 2398 2399 2400 2401 2402
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
D
dapan1121 已提交
2403
      CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, name, false, &pTableMeta, CTG_FLAG_UNKNOWN_STB));
D
dapan1121 已提交
2404 2405 2406 2407 2408 2409 2410 2411 2412

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
        tfree(pTableMeta);
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

D
dapan1121 已提交
2413
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2414 2415

_return:  
D
dapan1121 已提交
2416

D
dapan1121 已提交
2417 2418 2419 2420 2421 2422 2423 2424
  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
      tfree(pMeta);
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
2425
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2426
  }
D
dapan 已提交
2427
  
D
dapan1121 已提交
2428
  CTG_API_LEAVE(code);
2429
}
D
dapan 已提交
2430

D
dapan1121 已提交
2431
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
D
dapan1121 已提交
2432 2433
  CTG_API_ENTER();

D
dapan1121 已提交
2434 2435 2436 2437
  if (NULL == pCtg || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2438
  //TODO
D
dapan 已提交
2439

D
dapan1121 已提交
2440
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2441 2442
}

D
dapan1121 已提交
2443
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) {
D
dapan1121 已提交
2444 2445
  CTG_API_ENTER();

D
dapan1121 已提交
2446 2447
  if (NULL == pCtg || NULL == stables || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2448 2449
  }

D
dapan1121 已提交
2450 2451 2452 2453
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) {
D
dapan1121 已提交
2454
  CTG_API_ENTER();
D
dapan1121 已提交
2455 2456 2457 2458
  
  if (NULL == pCtg || NULL == dbs || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2459

D
dapan1121 已提交
2460
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
D
dapan1121 已提交
2461 2462
}

D
dapan 已提交
2463

D
dapan 已提交
2464
void catalogDestroy(void) {
D
dapan1121 已提交
2465 2466
  qInfo("start to destroy catalog");
  
D
dapan 已提交
2467
  if (NULL == gCtgMgmt.pCluster || atomic_load_8(&gCtgMgmt.exit)) {
D
dapan1121 已提交
2468 2469 2470
    return;
  }

D
dapan 已提交
2471 2472 2473
  atomic_store_8(&gCtgMgmt.exit, true);

  tsem_post(&gCtgMgmt.sem);
D
dapan1121 已提交
2474

D
dapan1121 已提交
2475 2476 2477 2478
  while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
    usleep(1);
  }
  
D
dapan 已提交
2479
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2480

D
dapan1121 已提交
2481
  SCatalog *pCtg = NULL;
D
dapan 已提交
2482
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
2483
  while (pIter) {
D
dapan1121 已提交
2484
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
2485

D
dapan1121 已提交
2486 2487
    if (pCtg) {
      catalogFreeHandle(pCtg);
D
dapan1121 已提交
2488 2489
    }
    
D
dapan 已提交
2490
    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
D
dapan 已提交
2491
  }
D
dapan1121 已提交
2492
  
D
dapan 已提交
2493 2494
  taosHashCleanup(gCtgMgmt.pCluster);
  gCtgMgmt.pCluster = NULL;
D
dapan1121 已提交
2495

D
dapan 已提交
2496
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2497

D
dapan1121 已提交
2498
  qInfo("catalog destroyed");
D
dapan 已提交
2499 2500 2501 2502
}