catalog.c 78.1 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20

D
dapan 已提交
21 22 23 24 25 26
int32_t ctgActUpdateVg(SCtgMetaAction *action);
int32_t ctgActUpdateTbl(SCtgMetaAction *action);
int32_t ctgActRemoveDB(SCtgMetaAction *action);
int32_t ctgActRemoveStb(SCtgMetaAction *action);
int32_t ctgActRemoveTbl(SCtgMetaAction *action);

D
dapan1121 已提交
27
extern SCtgDebug gCTGDebug;
D
dapan 已提交
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
SCatalogMgmt gCtgMgmt = {0};
SCtgAction gCtgAction[CTG_ACT_MAX] = {{
                            CTG_ACT_UPDATE_VG,
                            "update vgInfo",
                            ctgActUpdateVg
                          },
                          {
                            CTG_ACT_UPDATE_TBL,
                            "update tbMeta",
                            ctgActUpdateTbl
                          },
                          {
                            CTG_ACT_REMOVE_DB,
                            "remove DB",
                            ctgActRemoveDB
                          },
                          {
                            CTG_ACT_REMOVE_STB,
                            "remove stbMeta",
                            ctgActRemoveStb
                          },
                          {
                            CTG_ACT_REMOVE_TBL,
                            "remove tbMeta",
                            ctgActRemoveTbl
                          }
};

D
dapan1121 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
  if (NULL == mgmt->slots) {
    return;
  }

  for (int32_t i = 0; i < mgmt->slotNum; ++i) {
    SCtgRentSlot *slot = &mgmt->slots[i];
    if (slot->meta) {
      taosArrayDestroy(slot->meta);
      slot->meta = NULL;
    }
  }

wafwerar's avatar
wafwerar 已提交
69
  taosMemoryFreeClear(mgmt->slots);
D
dapan1121 已提交
70 71 72 73 74 75
}


void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
D
dapan1121 已提交
76
    int32_t stblNum = taosHashGetSize(cache->stbCache);  
D
dapan1121 已提交
77 78
    taosHashCleanup(cache->stbCache);
    cache->stbCache = NULL;
D
dapan1121 已提交
79
    CTG_CACHE_STAT_SUB(stblNum, stblNum);
D
dapan1121 已提交
80 81 82 83 84
  }
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);

  CTG_LOCK(CTG_WRITE, &cache->metaLock);
  if (cache->metaCache) {
D
dapan1121 已提交
85
    int32_t tblNum = taosHashGetSize(cache->metaCache);
D
dapan1121 已提交
86 87
    taosHashCleanup(cache->metaCache);
    cache->metaCache = NULL;
D
dapan1121 已提交
88
    CTG_CACHE_STAT_SUB(tblNum, tblNum);
D
dapan1121 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102
  }
  CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
}

void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
  if (NULL == vgInfo) {
    return;
  }

  if (vgInfo->vgHash) {
    taosHashCleanup(vgInfo->vgHash);
    vgInfo->vgHash = NULL;
  }
  
wafwerar's avatar
wafwerar 已提交
103
  taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123
}

void ctgFreeDbCache(SCtgDBCache *dbCache) {
  if (NULL == dbCache) {
    return;
  }

  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
  ctgFreeVgInfo (dbCache->vgInfo);
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);

  ctgFreeTableMetaCache(&dbCache->tbCache);
}


void ctgFreeHandle(SCatalog* pCtg) {
  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);
  
  if (pCtg->dbCache) {
D
dapan1121 已提交
124 125
    int32_t dbNum = taosHashGetSize(pCtg->dbCache);
    
D
dapan1121 已提交
126 127 128 129 130 131 132 133 134 135 136 137
    void *pIter = taosHashIterate(pCtg->dbCache, NULL);
    while (pIter) {
      SCtgDBCache *dbCache = pIter;

      atomic_store_8(&dbCache->deleted, 1);

      ctgFreeDbCache(dbCache);
            
      pIter = taosHashIterate(pCtg->dbCache, pIter);
    }  

    taosHashCleanup(pCtg->dbCache);
D
dapan1121 已提交
138 139
    
    CTG_CACHE_STAT_SUB(dbNum, dbNum);
D
dapan1121 已提交
140 141
  }
  
wafwerar's avatar
wafwerar 已提交
142
  taosMemoryFree(pCtg);
D
dapan1121 已提交
143 144 145 146 147 148 149 150
}



void ctgWaitAction(SCtgMetaAction *action) {
  while (true) {
    tsem_wait(&gCtgMgmt.queue.rspSem);
    
X
Xiaoyu Wang 已提交
151
    if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
D
dapan1121 已提交
152 153 154 155 156 157 158 159 160 161 162 163
      tsem_post(&gCtgMgmt.queue.rspSem);
      break;
    }

    if (gCtgMgmt.queue.seqDone >= action->seqId) {
      break;
    }

    tsem_post(&gCtgMgmt.queue.rspSem);
    sched_yield();
  }
}
D
dapan 已提交
164 165

void ctgPopAction(SCtgMetaAction **action) {
D
dapan1121 已提交
166
  SCtgQNode *orig = gCtgMgmt.queue.head;
D
dapan 已提交
167
  
D
dapan1121 已提交
168 169
  SCtgQNode *node = gCtgMgmt.queue.head->next;
  gCtgMgmt.queue.head = gCtgMgmt.queue.head->next;
D
dapan 已提交
170 171 172

  CTG_QUEUE_SUB();
  
wafwerar's avatar
wafwerar 已提交
173
  taosMemoryFreeClear(orig);
D
dapan 已提交
174 175 176 177 178

  *action = &node->action;
}


D
dapan1121 已提交
179
int32_t ctgPushAction(SCatalog* pCtg, SCtgMetaAction *action) {
wafwerar's avatar
wafwerar 已提交
180
  SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
D
dapan 已提交
181 182 183 184
  if (NULL == node) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
185 186

  action->seqId = atomic_add_fetch_64(&gCtgMgmt.queue.seqId, 1);
D
dapan 已提交
187 188 189
  
  node->action = *action;

D
dapan1121 已提交
190 191 192 193
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
  gCtgMgmt.queue.tail->next = node;
  gCtgMgmt.queue.tail = node;
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.queue.qlock);
D
dapan 已提交
194 195

  CTG_QUEUE_ADD();
D
dapan1121 已提交
196
  CTG_RUNTIME_STAT_ADD(qNum, 1);
D
dapan 已提交
197

D
dapan1121 已提交
198
  tsem_post(&gCtgMgmt.queue.reqSem);
D
dapan 已提交
199

D
dapan1121 已提交
200
  ctgDebug("action [%s] added into queue", gCtgAction[action->act].name);
D
dapan 已提交
201

D
dapan1121 已提交
202 203
  if (action->syncReq) {
    ctgWaitAction(action);
D
dapan1121 已提交
204 205
  }

D
dapan1121 已提交
206
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
207 208 209
}


D
dapan1121 已提交
210 211 212
int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
  int32_t code = 0;
  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
wafwerar's avatar
wafwerar 已提交
213
  SCtgRemoveDBMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveDBMsg));
D
dapan1121 已提交
214 215 216 217 218
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

D
dapan1121 已提交
219 220 221 222 223
  char *p = strchr(dbFName, '.');
  if (p && CTG_IS_INF_DBNAME(p + 1)) {
    dbFName = p + 1;
  }

D
dapan1121 已提交
224 225 226 227 228 229
  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->dbId = dbId;

  action.data = msg;

D
dapan1121 已提交
230
  CTG_ERR_JRET(ctgPushAction(pCtg, &action));
D
dapan1121 已提交
231 232 233 234

  return TSDB_CODE_SUCCESS;

_return:
H
Haojun Liao 已提交
235

wafwerar's avatar
wafwerar 已提交
236
  taosMemoryFreeClear(action.data);
H
Haojun Liao 已提交
237
  CTG_RET(code);
D
dapan1121 已提交
238 239 240
}


D
dapan1121 已提交
241
int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) {
D
dapan1121 已提交
242
  int32_t code = 0;
D
dapan1121 已提交
243
  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq};
wafwerar's avatar
wafwerar 已提交
244
  SCtgRemoveStbMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveStbMsg));
D
dapan1121 已提交
245 246 247 248 249 250 251 252 253 254 255 256 257
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  strncpy(msg->stbName, stbName, sizeof(msg->stbName));
  msg->dbId = dbId;
  msg->suid = suid;

  action.data = msg;

D
dapan1121 已提交
258
  CTG_ERR_JRET(ctgPushAction(pCtg, &action));
D
dapan1121 已提交
259 260 261 262

  return TSDB_CODE_SUCCESS;

_return:
H
Haojun Liao 已提交
263

wafwerar's avatar
wafwerar 已提交
264
  taosMemoryFreeClear(action.data);
H
Haojun Liao 已提交
265
  CTG_RET(code);
D
dapan1121 已提交
266 267 268 269
}



D
dapan1121 已提交
270
int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
D
dapan1121 已提交
271
  int32_t code = 0;
D
dapan1121 已提交
272
  SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq};
wafwerar's avatar
wafwerar 已提交
273
  SCtgRemoveTblMsg *msg = taosMemoryMalloc(sizeof(SCtgRemoveTblMsg));
D
dapan1121 已提交
274 275 276 277 278 279 280 281 282 283 284 285
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  msg->pCtg = pCtg;
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  strncpy(msg->tbName, tbName, sizeof(msg->tbName));
  msg->dbId = dbId;

  action.data = msg;

D
dapan1121 已提交
286
  CTG_ERR_JRET(ctgPushAction(pCtg, &action));
D
dapan1121 已提交
287 288 289 290

  return TSDB_CODE_SUCCESS;

_return:
H
Haojun Liao 已提交
291

wafwerar's avatar
wafwerar 已提交
292
  taosMemoryFreeClear(action.data);
H
Haojun Liao 已提交
293
  CTG_RET(code);
D
dapan1121 已提交
294 295
}

D
dapan1121 已提交
296 297 298
int32_t ctgPushUpdateVgMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, SDBVgInfo* dbInfo, bool syncReq) {
  int32_t code = 0;
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG, .syncReq = syncReq};
wafwerar's avatar
wafwerar 已提交
299
  SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
D
dapan1121 已提交
300 301 302 303
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
    ctgFreeVgInfo(dbInfo);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
304 305
  }

D
dapan1121 已提交
306 307 308 309 310
  char *p = strchr(dbFName, '.');
  if (p && CTG_IS_INF_DBNAME(p + 1)) {
    dbFName = p + 1;
  }

D
dapan1121 已提交
311 312 313 314
  strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
  msg->pCtg = pCtg;
  msg->dbId = dbId;
  msg->dbInfo = dbInfo;
D
dapan1121 已提交
315

D
dapan1121 已提交
316
  action.data = msg;
D
dapan1121 已提交
317

D
dapan1121 已提交
318
  CTG_ERR_JRET(ctgPushAction(pCtg, &action));
D
dapan1121 已提交
319

D
dapan1121 已提交
320
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
321

D
dapan1121 已提交
322
_return:
D
dapan1121 已提交
323

D
dapan1121 已提交
324
  ctgFreeVgInfo(dbInfo);
wafwerar's avatar
wafwerar 已提交
325
  taosMemoryFreeClear(action.data);
D
dapan1121 已提交
326
  CTG_RET(code);
D
dapan1121 已提交
327 328
}

D
dapan1121 已提交
329 330
int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) {
  int32_t code = 0;
D
dapan1121 已提交
331
  SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL, .syncReq = syncReq};
wafwerar's avatar
wafwerar 已提交
332
  SCtgUpdateTblMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
D
dapan1121 已提交
333 334 335 336 337
  if (NULL == msg) {
    ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

D
dapan1121 已提交
338 339 340 341 342
  char *p = strchr(output->dbFName, '.');
  if (p && CTG_IS_INF_DBNAME(p + 1)) {
    memmove(output->dbFName, p + 1, strlen(p + 1));
  }

D
dapan1121 已提交
343 344 345 346 347 348 349 350 351 352 353
  msg->pCtg = pCtg;
  msg->output = output;

  action.data = msg;

  CTG_ERR_JRET(ctgPushAction(pCtg, &action));

  return TSDB_CODE_SUCCESS;
  
_return:

wafwerar's avatar
wafwerar 已提交
354
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
355 356 357 358
  
  CTG_RET(code);
}

D
dapan1121 已提交
359

D
dapan 已提交
360
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
D
dapan1121 已提交
361 362
  CTG_LOCK(CTG_READ, &dbCache->vgLock);
  
D
dapan 已提交
363 364 365 366 367 368 369 370 371 372
  if (dbCache->deleted) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    
    *inCache = false;
    return TSDB_CODE_SUCCESS;
  }

  
D
dapan1121 已提交
373 374 375
  if (NULL == dbCache->vgInfo) {
    CTG_UNLOCK(CTG_READ, &dbCache->vgLock);

D
dapan 已提交
376
    *inCache = false;
D
dapan1121 已提交
377
    ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
D
dapan1121 已提交
378 379 380
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
381 382
  *inCache = true;
  
D
dapan1121 已提交
383 384 385 386 387 388 389 390 391 392 393 394 395 396
  return TSDB_CODE_SUCCESS;
}

int32_t ctgWAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
  CTG_LOCK(CTG_WRITE, &dbCache->vgLock);

  if (dbCache->deleted) {
    ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
397

D
dapan1121 已提交
398 399 400
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache) {
  taosHashRelease(pCtg->dbCache, dbCache);
}
D
dapan1121 已提交
401

D
dapan1121 已提交
402 403 404
void ctgReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
}
D
dapan1121 已提交
405

D
dapan1121 已提交
406 407 408
void ctgWReleaseVgInfo(SCtgDBCache *dbCache) {
  CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
}
D
dapan1121 已提交
409

D
dapan1121 已提交
410

D
dapan 已提交
411
int32_t ctgAcquireDBCacheImpl(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool acquire) {
D
dapan1121 已提交
412 413 414 415 416
  char *p = strchr(dbFName, '.');
  if (p && CTG_IS_INF_DBNAME(p + 1)) {
    dbFName = p + 1;
  }

D
dapan 已提交
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453
  SCtgDBCache *dbCache = NULL;
  if (acquire) {
    dbCache = (SCtgDBCache *)taosHashAcquire(pCtg->dbCache, dbFName, strlen(dbFName));
  } else {
    dbCache = (SCtgDBCache *)taosHashGet(pCtg->dbCache, dbFName, strlen(dbFName));
  }
  
  if (NULL == dbCache) {
    *pCache = NULL;
    ctgDebug("db not in cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->deleted) {
    if (acquire) {
      ctgReleaseDBCache(pCtg, dbCache);
    }    
    
    *pCache = NULL;
    ctgDebug("db is removing from cache, dbFName:%s", dbFName);
    return TSDB_CODE_SUCCESS;
  }

  *pCache = dbCache;
    
  return TSDB_CODE_SUCCESS;
}

int32_t ctgAcquireDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, true));
}

int32_t ctgGetDBCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache) {
  CTG_RET(ctgAcquireDBCacheImpl(pCtg, dbFName, pCache, false));
}


D
dapan1121 已提交
454
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache, bool *inCache) {
D
dapan1121 已提交
455 456
  SCtgDBCache *dbCache = NULL;

D
dapan1121 已提交
457
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
458 459
    ctgDebug("empty db cache, dbFName:%s", dbFName);
    goto _return;
D
dapan1121 已提交
460 461 462 463
  }

  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
  if (NULL == dbCache) {  
D
dapan1121 已提交
464 465
    ctgDebug("db %s not in cache", dbFName);
    goto _return;
D
dapan1121 已提交
466 467
  }
  
D
dapan 已提交
468 469
  ctgAcquireVgInfo(pCtg, dbCache, inCache);
  if (!(*inCache)) {
D
dapan1121 已提交
470 471
    ctgDebug("vgInfo of db %s not in cache", dbFName);
    goto _return;
D
dapan1121 已提交
472
  }
D
dapan1121 已提交
473

D
dapan1121 已提交
474
  *pCache = dbCache;
D
dapan1121 已提交
475
  *inCache = true;
D
dapan1121 已提交
476

D
dapan1121 已提交
477 478
  CTG_CACHE_STAT_ADD(vgHitNum, 1);

D
dapan1121 已提交
479
  ctgDebug("Got db vgInfo from cache, dbFName:%s", dbFName);
D
dapan1121 已提交
480
  
D
dapan1121 已提交
481 482 483 484 485 486 487 488 489 490 491 492 493
  return TSDB_CODE_SUCCESS;

_return:

  if (dbCache) {
    ctgReleaseDBCache(pCtg, dbCache);
  }

  *pCache = NULL;
  *inCache = false;

  CTG_CACHE_STAT_ADD(vgMissNum, 1);
  
D
dapan1121 已提交
494 495 496
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray **out) {
  char *msg = NULL;
  int32_t msgLen = 0;

  ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pMgmtEps->inUse);

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)](NULL, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }
  
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_QNODE_LIST,
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
519
    ctgError("error rsp for qnode list, error:%s", tstrerror(rpcRsp.code));
D
dapan1121 已提交
520 521 522 523 524 525 526 527 528
    CTG_ERR_RET(rpcRsp.code);
  }

  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process qnode list rsp failed, error:%s", tstrerror(rpcRsp.code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
529
  ctgDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*out));
D
dapan1121 已提交
530 531 532

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
533 534


D
dapan1121 已提交
535
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
D
dapan1121 已提交
536 537 538
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan 已提交
539
  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
D
dapan1121 已提交
540 541 542 543 544 545

  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
ut test  
dapan1121 已提交
546
  
D
dapan1121 已提交
547
  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
548
      .msgType = TDMT_MND_USE_DB,
D
catalog  
dapan1121 已提交
549
      .pCont   = msg,
D
dapan1121 已提交
550 551 552 553 554 555
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};

  rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
556
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
557
    ctgError("error rsp for use db, error:%s, db:%s", tstrerror(rpcRsp.code), input->db);
D
dapan1121 已提交
558
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
559
  }
D
dapan1121 已提交
560

D
dapan1121 已提交
561 562 563 564 565
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)](out, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
    ctgError("Process use db rsp failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
566

D
dapan 已提交
567 568
  ctgDebug("Got db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
569 570
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
571

D
dapan1121 已提交
572 573
int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist) {
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
574
    *exist = 0;
D
dapan1121 已提交
575 576 577 578
    ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tbName);
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
579 580
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
581 582
  if (NULL == dbCache) {
    *exist = 0;
D
dapan1121 已提交
583 584 585
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
586
  size_t sz = 0;
D
dapan1121 已提交
587 588 589 590
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);  
  STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName));
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
D
dapan1121 已提交
591
  if (NULL == tbMeta) {
D
dapan 已提交
592
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
593
    
D
dapan1121 已提交
594
    *exist = 0;
D
dapan1121 已提交
595
    ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
596 597 598 599
    return TSDB_CODE_SUCCESS;
  }

  *exist = 1;
D
dapan1121 已提交
600

D
dapan 已提交
601
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
602
  
D
dapan1121 已提交
603
  ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
604 605 606 607
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
608

D
dapan1121 已提交
609
int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta, bool *inCache, int32_t flag, uint64_t *dbId) {
D
dapan1121 已提交
610
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
611 612
    ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
    goto _return;
D
dapan1121 已提交
613 614
  }

D
dapan1121 已提交
615
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
616
  if (CTG_FLAG_IS_INF_DB(flag)) {
D
dapan1121 已提交
617 618 619 620
    strcpy(dbFName, pTableName->dbname);
  } else {
    tNameGetFullDbName(pTableName, dbFName);
  }
D
dapan1121 已提交
621

D
dapan1121 已提交
622 623
  *pTableMeta = NULL;

D
dapan1121 已提交
624 625
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
626
  if (NULL == dbCache) {
D
dapan1121 已提交
627 628
    ctgDebug("db %s not in cache", pTableName->tname);
    goto _return;
D
dapan1121 已提交
629 630
  }
  
H
Haojun Liao 已提交
631
  int32_t sz = 0;
D
dapan1121 已提交
632
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
H
Haojun Liao 已提交
633
  int32_t code = taosHashGetDup_m(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), (void **)pTableMeta, &sz);
D
dapan1121 已提交
634 635
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan1121 已提交
636
  if (NULL == *pTableMeta) {
D
dapan1121 已提交
637 638
    ctgReleaseDBCache(pCtg, dbCache);
    ctgDebug("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
639
    goto _return;
D
dapan1121 已提交
640 641
  }

D
dapan1121 已提交
642 643 644
  if (dbId) {
    *dbId = dbCache->dbId;
  }
H
Haojun Liao 已提交
645

H
Haojun Liao 已提交
646
  STableMeta* tbMeta = *pTableMeta;
D
dapan1121 已提交
647

D
dapan1121 已提交
648
  if (tbMeta->tableType != TSDB_CHILD_TABLE) {
D
dapan1121 已提交
649
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
650
    ctgDebug("Got meta from cache, type:%d, dbFName:%s, tbName:%s", tbMeta->tableType, dbFName, pTableName->tname);
D
dapan1121 已提交
651 652 653 654

    *inCache = true;
    CTG_CACHE_STAT_ADD(tblHitNum, 1);
    
D
dapan1121 已提交
655 656 657
    return TSDB_CODE_SUCCESS;
  }
  
D
dapan1121 已提交
658
  CTG_LOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
659
  
D
dapan1121 已提交
660
  STableMeta **stbMeta = taosHashGet(dbCache->tbCache.stbCache, &tbMeta->suid, sizeof(tbMeta->suid));
D
dapan1121 已提交
661
  if (NULL == stbMeta || NULL == *stbMeta) {
D
dapan1121 已提交
662
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
663 664
    ctgReleaseDBCache(pCtg, dbCache);
    ctgError("stb not in stbCache, suid:%"PRIx64, tbMeta->suid);
wafwerar's avatar
wafwerar 已提交
665
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
666
    goto _return;
D
dapan1121 已提交
667
  }
D
dapan1121 已提交
668

D
dapan1121 已提交
669
  if ((*stbMeta)->suid != tbMeta->suid) {    
D
dapan1121 已提交
670
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
671
    ctgReleaseDBCache(pCtg, dbCache);
wafwerar's avatar
wafwerar 已提交
672
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
673
    ctgError("stable suid in stbCache mis-match, expected suid:%"PRIx64 ",actual suid:%"PRIx64, tbMeta->suid, (*stbMeta)->suid);
D
dapan1121 已提交
674 675
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }
D
dapan1121 已提交
676

D
dapan1121 已提交
677
  int32_t metaSize = CTG_META_SIZE(*stbMeta);
wafwerar's avatar
wafwerar 已提交
678
  *pTableMeta = taosMemoryRealloc(*pTableMeta, metaSize);
D
dapan1121 已提交
679
  if (NULL == *pTableMeta) {    
D
dapan1121 已提交
680
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
681
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
682
    ctgError("realloc size[%d] failed", metaSize);
D
dapan1121 已提交
683
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
684 685
  }

D
dapan1121 已提交
686 687
  memcpy(&(*pTableMeta)->sversion, &(*stbMeta)->sversion, metaSize - sizeof(SCTableMeta));

D
dapan1121 已提交
688
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
D
dapan1121 已提交
689

D
dapan1121 已提交
690
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan 已提交
691

D
dapan1121 已提交
692 693 694
  *inCache = true;
  CTG_CACHE_STAT_ADD(tblHitNum, 1);

D
dapan1121 已提交
695
  ctgDebug("Got tbmeta from cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
D
dapan1121 已提交
696
  
D
dapan1121 已提交
697 698 699 700 701 702 703
  return TSDB_CODE_SUCCESS;

_return:

  *inCache = false;
  CTG_CACHE_STAT_ADD(tblMissNum, 1);
  
D
dapan1121 已提交
704 705 706
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
707
int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const char* dbFName, const char *tableName, int32_t *tbType) {
D
dapan1121 已提交
708
  if (NULL == pCtg->dbCache) {
D
dapan 已提交
709
    ctgWarn("empty db cache, dbFName:%s, tbName:%s", dbFName, tableName);  
D
dapan1121 已提交
710 711
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
712
  
D
dapan 已提交
713 714
  SCtgDBCache *dbCache = NULL;
  ctgAcquireDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
715 716 717
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
718

D
dapan1121 已提交
719
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan 已提交
720
  STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, tableName, strlen(tableName));
D
dapan1121 已提交
721

D
dapan1121 已提交
722
  if (NULL == pTableMeta) {
D
dapan1121 已提交
723
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan 已提交
724
    ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, tableName);  
D
dapan 已提交
725
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
726
    
D
dapan1121 已提交
727 728 729
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
730 731
  *tbType = atomic_load_8(&pTableMeta->tableType);

D
dapan1121 已提交
732 733 734 735
  taosHashRelease(dbCache->tbCache.metaCache, pTableMeta);

  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);

D
dapan 已提交
736
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
737

D
dapan 已提交
738
  ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, tableName, *tbType);  
D
dapan1121 已提交
739 740 741 742
  
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
743
int32_t ctgGetTableMetaFromMnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, char *dbFName, char* tbName, STableMetaOutput* output) {
D
dapan1121 已提交
744
  SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
D
dapan1121 已提交
745 746 747 748
  char *msg = NULL;
  SEpSet *pVnodeEpSet = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
749
  ctgDebug("try to get table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
750

D
dapan1121 已提交
751
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)](&bInput, &msg, 0, &msgLen);
D
dapan1121 已提交
752 753 754 755
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
756 757

  SRpcMsg rpcMsg = {
D
dapan1121 已提交
758
      .msgType = TDMT_MND_TABLE_META,
D
dapan1121 已提交
759 760 761
      .pCont   = msg,
      .contLen = msgLen,
  };
D
dapan1121 已提交
762

D
dapan1121 已提交
763 764
  SRpcMsg rpcRsp = {0};

D
dapan1121 已提交
765
  rpcSendRecv(pTrans, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
766 767
  
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
768
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
769
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
770
      ctgDebug("stablemeta not exist in mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
771 772 773
      return TSDB_CODE_SUCCESS;
    }
    
D
dapan1121 已提交
774
    ctgError("error rsp for stablemeta from mnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tbName);
D
dapan1121 已提交
775 776 777
    CTG_ERR_RET(rpcRsp.code);
  }

D
dapan1121 已提交
778
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
D
dapan1121 已提交
779
  if (code) {
D
dapan1121 已提交
780
    ctgError("Process mnode stablemeta rsp failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tbName);
D
dapan1121 已提交
781 782 783
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
784
  ctgDebug("Got table meta from mnode, dbFName:%s, tbName:%s", dbFName, tbName);
D
dapan1121 已提交
785 786 787 788

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
789
int32_t ctgGetTableMetaFromMnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMetaOutput* output) {
D
dapan1121 已提交
790 791
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
792

D
dapan1121 已提交
793
  return ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, dbFName, (char *)pTableName->tname, output);
D
dapan1121 已提交
794
}
D
dapan1121 已提交
795

D
dapan1121 已提交
796
int32_t ctgGetTableMetaFromVnodeImpl(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
D
dapan1121 已提交
797
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == vgroupInfo || NULL == output) {
D
dapan1121 已提交
798
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
799 800
  }

D
dapan1121 已提交
801 802
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
803

D
dapan1121 已提交
804
  ctgDebug("try to get table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
805

D
dapan1121 已提交
806
  SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
D
dapan1121 已提交
807 808 809
  char *msg = NULL;
  int32_t msgLen = 0;

D
dapan1121 已提交
810 811
  int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)](&bInput, &msg, 0, &msgLen);
  if (code) {
D
dapan1121 已提交
812
    ctgError("Build vnode tablemeta msg failed, code:%x, dbFName:%s, tbName:%s", code, dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
813 814
    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
815 816

  SRpcMsg rpcMsg = {
H
Hongze Cheng 已提交
817
      .msgType = TDMT_VND_TABLE_META,
D
dapan1121 已提交
818 819 820 821 822
      .pCont   = msg,
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
L
Liu Jicong 已提交
823
  rpcSendRecv(pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
824
  
D
dapan1121 已提交
825
  if (TSDB_CODE_SUCCESS != rpcRsp.code) {
D
dapan1121 已提交
826
    if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) {
D
dapan1121 已提交
827
      SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
828
      ctgDebug("tablemeta not exist in vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
829 830 831
      return TSDB_CODE_SUCCESS;
    }
  
D
dapan1121 已提交
832
    ctgError("error rsp for table meta from vnode, code:%s, dbFName:%s, tbName:%s", tstrerror(rpcRsp.code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
833
    CTG_ERR_RET(rpcRsp.code);
D
dapan1121 已提交
834 835
  }

D
dapan1121 已提交
836 837
  code = queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)](output, rpcRsp.pCont, rpcRsp.contLen);
  if (code) {
D
dapan1121 已提交
838
    ctgError("Process vnode tablemeta rsp failed, code:%s, dbFName:%s, tbName:%s", tstrerror(code), dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
839 840 841
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
842
  ctgDebug("Got table meta from vnode, dbFName:%s, tbName:%s", dbFName, tNameGetTableName(pTableName));
D
dapan1121 已提交
843 844 845
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871
int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* output) {
  int32_t code = 0;
  int32_t retryNum = 0;
  
  while (retryNum < CTG_DEFAULT_MAX_RETRY_TIMES) {
    code = ctgGetTableMetaFromVnodeImpl(pCtg, pTrans, pMgmtEps, pTableName, vgroupInfo, output);
    if (code) {
      if (TSDB_CODE_VND_HASH_MISMATCH == code) {
        char dbFName[TSDB_DB_FNAME_LEN];
        tNameGetFullDbName(pTableName, dbFName);
        
        code = catalogRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName);
        if (code != TSDB_CODE_SUCCESS) {
          break;
        }

        ++retryNum;
        continue;
      }
    }

    break;
  }

  CTG_RET(code);
}
D
dapan1121 已提交
872

873 874
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) {
  switch (hashMethod) {
D
dapan1121 已提交
875 876 877 878 879 880 881 882
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
883
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList) {
D
dapan1121 已提交
884
  SHashObj *vgroupHash = NULL;
885
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
886 887
  SArray *vgList = NULL;
  int32_t code = 0;
D
dapan1121 已提交
888
  int32_t vgNum = taosHashGetSize(vgHash);
889

D
dapan1121 已提交
890
  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
D
dapan1121 已提交
891
  if (NULL == vgList) {
D
dapan1121 已提交
892
    ctgError("taosArrayInit failed, num:%d", vgNum);
D
dapan 已提交
893 894 895
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);    
  }

D
dapan1121 已提交
896
  void *pIter = taosHashIterate(vgHash, NULL);
897 898
  while (pIter) {
    vgInfo = pIter;
D
dapan1121 已提交
899

D
dapan1121 已提交
900
    if (NULL == taosArrayPush(vgList, vgInfo)) {
D
dapan1121 已提交
901
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
D
dapan1121 已提交
902
      taosHashCancelIterate(vgHash, pIter);      
D
dapan1121 已提交
903
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
904 905
    }
    
D
dapan1121 已提交
906
    pIter = taosHashIterate(vgHash, pIter);
907
    vgInfo = NULL;
D
dapan1121 已提交
908 909
  }

D
dapan1121 已提交
910
  *pList = vgList;
D
dapan1121 已提交
911

D
dapan1121 已提交
912
  ctgDebug("Got vgList from cache, vgNum:%d", vgNum);
D
dapan1121 已提交
913

D
dapan1121 已提交
914
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
915 916 917 918 919 920 921 922

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
D
dapan1121 已提交
923 924
}

D
dapan1121 已提交
925
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
926 927
  int32_t code = 0;
  
D
dapan1121 已提交
928
  int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
H
Haojun Liao 已提交
929 930 931
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

932
  if (vgNum <= 0) {
D
dapan1121 已提交
933
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
D
dapan1121 已提交
934
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
D
dapan1121 已提交
935 936
  }

937 938
  tableNameHashFp fp = NULL;
  SVgroupInfo *vgInfo = NULL;
D
dapan1121 已提交
939

D
dapan1121 已提交
940
  CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp));
941 942

  char tbFullName[TSDB_TABLE_FNAME_LEN];
H
Haojun Liao 已提交
943
  tNameExtractFullName(pTableName, tbFullName);
944 945 946

  uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName));

D
dapan1121 已提交
947
  void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
948 949 950
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
D
dapan1121 已提交
951
      taosHashCancelIterate(dbInfo->vgHash, pIter);
952
      break;
D
dapan1121 已提交
953
    }
954
    
D
dapan1121 已提交
955
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
956
    vgInfo = NULL;
D
dapan1121 已提交
957 958
  }

959
  if (NULL == vgInfo) {
D
dapan1121 已提交
960 961
    ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db, taosHashGetSize(dbInfo->vgHash));
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
962 963 964 965
  }

  *pVgroup = *vgInfo;

966
  CTG_RET(code);
D
dapan1121 已提交
967 968
}

D
dapan1121 已提交
969
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
D
dapan 已提交
970
  if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
971
    return -1;
D
dapan 已提交
972
  } else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
D
dapan1121 已提交
973 974 975 976 977 978
    return 1;
  } else {
    return 0;
  }
}

D
dapan1121 已提交
979
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
D
dapan1121 已提交
980
  if (*(int64_t *)key1 < ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
981
    return -1;
D
dapan1121 已提交
982
  } else if (*(int64_t *)key1 > ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
983 984 985
    return 1;
  } else {
    return 0;
D
dapan1121 已提交
986
  }
D
dapan1121 已提交
987 988
}

D
dapan1121 已提交
989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
  if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
    return -1;
  } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
  if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
    return -1;
  } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
    return 1;
  } else {
    return 0;
  }
}


D
dapan1121 已提交
1010
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
D
dapan1121 已提交
1011 1012 1013 1014
  mgmt->slotRIdx = 0;
  mgmt->slotNum = rentSec / CTG_RENT_SLOT_SECOND;
  mgmt->type = type;

D
dapan1121 已提交
1015
  size_t msgSize = sizeof(SCtgRentSlot) * mgmt->slotNum;
D
dapan1121 已提交
1016
  
wafwerar's avatar
wafwerar 已提交
1017
  mgmt->slots = taosMemoryCalloc(1, msgSize);
D
dapan1121 已提交
1018 1019
  if (NULL == mgmt->slots) {
    qError("calloc %d failed", (int32_t)msgSize);
D
dapan 已提交
1020
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1021
  }
D
dapan1121 已提交
1022

D
dapan1121 已提交
1023 1024 1025 1026
  qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1027

D
dapan1121 已提交
1028

D
dapan1121 已提交
1029
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size) {
1030
  int16_t widx = abs((int)(id % mgmt->slotNum));
D
dapan1121 已提交
1031

D
dapan1121 已提交
1032
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
1033 1034 1035 1036 1037 1038 1039 1040
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    slot->meta = taosArrayInit(CTG_DEFAULT_RENT_SLOT_SIZE, size);
    if (NULL == slot->meta) {
      qError("taosArrayInit %d failed, id:%"PRIx64", slot idx:%d, type:%d", CTG_DEFAULT_RENT_SLOT_SIZE, id, widx, mgmt->type);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1041
    }
D
dapan1121 已提交
1042
  }
D
dapan1121 已提交
1043

D
dapan1121 已提交
1044 1045 1046
  if (NULL == taosArrayPush(slot->meta, meta)) {
    qError("taosArrayPush meta to rent failed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1047 1048
  }

D
dapan1121 已提交
1049
  slot->needSort = true;
D
dapan1121 已提交
1050

D
dapan1121 已提交
1051
  qDebug("add meta to rent, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
D
dapan1121 已提交
1052

D
dapan1121 已提交
1053 1054 1055 1056 1057 1058
_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);
  CTG_RET(code);
}

D
dapan1121 已提交
1059
int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
1060
  int16_t widx = abs((int)(id % mgmt->slotNum));
D
dapan1121 已提交
1061

D
dapan1121 已提交
1062
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
1063
  int32_t code = 0;
1064

D
dapan1121 已提交
1065 1066
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
D
dapan1121 已提交
1067 1068
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1069 1070 1071
  }

  if (slot->needSort) {
D
dapan1121 已提交
1072
    qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1073
    taosArraySort(slot->meta, sortCompare);
D
dapan1121 已提交
1074
    slot->needSort = false;
D
dapan1121 已提交
1075
    qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1076 1077
  }

D
dapan1121 已提交
1078
  void *orig = taosArraySearch(slot->meta, &id, searchCompare, TD_EQ);
D
dapan1121 已提交
1079
  if (NULL == orig) {
D
dapan1121 已提交
1080
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta));
D
dapan1121 已提交
1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  memcpy(orig, meta, size);

  qDebug("meta in rent updated, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  if (code) {
    qWarn("meta in rent update failed, will try to add it, code:%x, id:%"PRIx64", slot idx:%d, type:%d", code, id, widx, mgmt->type);
    CTG_RET(ctgMetaRentAdd(mgmt, meta, id, size));
  }

  CTG_RET(code);
}

D
dapan1121 已提交
1100
int32_t ctgMetaRentRemove(SCtgRentMgmt *mgmt, int64_t id, __compar_fn_t sortCompare, __compar_fn_t searchCompare) {
1101
  int16_t widx = abs((int)(id % mgmt->slotNum));
D
dapan1121 已提交
1102

D
dapan1121 已提交
1103
  SCtgRentSlot *slot = &mgmt->slots[widx];
D
dapan1121 已提交
1104 1105 1106 1107 1108 1109 1110 1111 1112
  int32_t code = 0;
  
  CTG_LOCK(CTG_WRITE, &slot->lock);
  if (NULL == slot->meta) {
    qError("empty meta slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (slot->needSort) {
D
dapan1121 已提交
1113
    taosArraySort(slot->meta, sortCompare);
D
dapan1121 已提交
1114 1115 1116 1117
    slot->needSort = false;
    qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type);
  }

D
dapan1121 已提交
1118
  int32_t idx = taosArraySearchIdx(slot->meta, &id, searchCompare, TD_EQ);
D
dapan1121 已提交
1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135
  if (idx < 0) {
    qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  taosArrayRemove(slot->meta, idx);

  qDebug("meta in rent removed, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type);

_return:

  CTG_UNLOCK(CTG_WRITE, &slot->lock);

  CTG_RET(code);
}


D
dapan1121 已提交
1136
int32_t ctgMetaRentGetImpl(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
1137 1138 1139 1140
  int16_t ridx = atomic_add_fetch_16(&mgmt->slotRIdx, 1);
  if (ridx >= mgmt->slotNum) {
    ridx %= mgmt->slotNum;
    atomic_store_16(&mgmt->slotRIdx, ridx);
D
dapan1121 已提交
1141
  }
D
dapan1121 已提交
1142

D
dapan1121 已提交
1143
  SCtgRentSlot *slot = &mgmt->slots[ridx];
D
dapan1121 已提交
1144
  int32_t code = 0;
D
dapan1121 已提交
1145
  
D
dapan1121 已提交
1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160
  CTG_LOCK(CTG_READ, &slot->lock);
  if (NULL == slot->meta) {
    qDebug("empty meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t metaNum = taosArrayGetSize(slot->meta);
  if (metaNum <= 0) {
    qDebug("no meta in slot:%d, type:%d", ridx, mgmt->type);
    *num = 0;
    goto _return;
  }

  size_t msize = metaNum * size;
wafwerar's avatar
wafwerar 已提交
1161
  *res = taosMemoryMalloc(msize);
D
dapan1121 已提交
1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181
  if (NULL == *res) {
    qError("malloc %d failed", (int32_t)msize);
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  void *meta = taosArrayGet(slot->meta, 0);

  memcpy(*res, meta, msize);

  *num = (uint32_t)metaNum;

  qDebug("Got %d meta from rent, type:%d", (int32_t)metaNum, mgmt->type);

_return:

  CTG_UNLOCK(CTG_READ, &slot->lock);

  CTG_RET(code);
}

D
dapan1121 已提交
1182
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size) {
D
dapan1121 已提交
1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201
  while (true) {
    int64_t msec = taosGetTimestampMs();
    int64_t lsec = atomic_load_64(&mgmt->lastReadMsec);
    if ((msec - lsec) < CTG_RENT_SLOT_SECOND * 1000) {
      *res = NULL;
      *num = 0;
      qDebug("too short time period to get expired meta, type:%d", mgmt->type);
      return TSDB_CODE_SUCCESS;
    }

    if (lsec != atomic_val_compare_exchange_64(&mgmt->lastReadMsec, lsec, msec)) {
      continue;
    }

    break;
  }

  CTG_ERR_RET(ctgMetaRentGetImpl(mgmt, res, num, size));

D
dapan1121 已提交
1202 1203 1204
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1205
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
D
dapan1121 已提交
1206
  int32_t code = 0;
D
dapan1121 已提交
1207

D
dapan1121 已提交
1208 1209 1210
  SCtgDBCache newDBCache = {0};
  newDBCache.dbId = dbId;

D
dapan 已提交
1211
  newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1212
  if (NULL == newDBCache.tbCache.metaCache) {
D
dapan 已提交
1213
    ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1214 1215 1216
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

D
dapan 已提交
1217
  newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
1218
  if (NULL == newDBCache.tbCache.stbCache) {
D
dapan 已提交
1219
    ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
1220 1221 1222 1223
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }

  code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
D
dapan1121 已提交
1224 1225 1226
  if (code) {
    if (HASH_NODE_EXIST(code)) {
      ctgDebug("db already in cache, dbFName:%s", dbFName);
D
dapan1121 已提交
1227
      goto _return;
D
dapan1121 已提交
1228 1229 1230
    }
    
    ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName);
D
dapan1121 已提交
1231 1232
    CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1233 1234

  CTG_CACHE_STAT_ADD(dbNum, 1);
D
dapan1121 已提交
1235
 
D
dapan1121 已提交
1236
  SDbVgVersion vgVersion = {.dbId = newDBCache.dbId, .vgVersion = -1};
D
dapan1121 已提交
1237 1238
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));

D
dapan1121 已提交
1239
  ctgDebug("db added to cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1240

D
dapan1121 已提交
1241 1242 1243
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->dbRent, &vgVersion, dbId, sizeof(SDbVgVersion)));

  ctgDebug("db added to rent, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbId);
D
dapan1121 已提交
1244

D
dapan1121 已提交
1245
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1246

D
dapan1121 已提交
1247
_return:
D
dapan1121 已提交
1248

D
dapan1121 已提交
1249
  ctgFreeDbCache(&newDBCache);
D
dapan1121 已提交
1250

D
dapan1121 已提交
1251 1252
  CTG_RET(code);
}
D
dapan1121 已提交
1253

D
dapan1121 已提交
1254

D
dapan1121 已提交
1255
void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
D
dapan1121 已提交
1256 1257 1258 1259 1260
  CTG_LOCK(CTG_WRITE, &cache->stbLock);
  if (cache->stbCache) {
    void *pIter = taosHashIterate(cache->stbCache, NULL);
    while (pIter) {
      uint64_t *suid = NULL;
H
Haojun Liao 已提交
1261
      suid = taosHashGetKey(pIter, NULL);
D
dapan1121 已提交
1262

D
dapan1121 已提交
1263
      if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCtg->stbRent, *suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare)) {
D
dapan1121 已提交
1264 1265 1266 1267 1268 1269 1270 1271 1272 1273
        ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
      }
          
      pIter = taosHashIterate(cache->stbCache, pIter);
    }
  }
  CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
}


D
dapan1121 已提交
1274
int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
D
dapan 已提交
1275 1276 1277 1278
  uint64_t dbId = dbCache->dbId;
  
  ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

D
dapan1121 已提交
1279 1280 1281 1282 1283 1284
  atomic_store_8(&dbCache->deleted, 1);

  ctgRemoveStbRent(pCtg, &dbCache->tbCache);

  ctgFreeDbCache(dbCache);

D
dapan1121 已提交
1285
  CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
D
dapan1121 已提交
1286 1287 1288 1289
  
  ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);

  if (taosHashRemove(pCtg->dbCache, dbFName, strlen(dbFName))) {
D
dapan1121 已提交
1290 1291
    ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1292
  }
D
dapan 已提交
1293

D
dapan1121 已提交
1294 1295
  CTG_CACHE_STAT_SUB(dbNum, 1);

D
dapan 已提交
1296
  ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
D
dapan1121 已提交
1297 1298 1299
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1300 1301


D
dapan1121 已提交
1302
int32_t ctgGetAddDBCache(SCatalog* pCtg, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
D
dapan1121 已提交
1303 1304
  int32_t code = 0;
  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
1305
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1306
  
D
dapan1121 已提交
1307 1308
  if (dbCache) {
  // TODO OPEN IT
D
dapan1121 已提交
1309
#if 0    
D
dapan1121 已提交
1310 1311 1312 1313
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1314
#else
D
dapan1121 已提交
1315 1316 1317
    if (0 == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1318 1319
    }

D
dapan1121 已提交
1320 1321 1322 1323 1324
    if (dbId && (dbCache->dbId == 0)) {
      dbCache->dbId = dbId;
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
D
dapan1121 已提交
1325
    
D
dapan1121 已提交
1326 1327 1328 1329 1330 1331
    if (dbCache->dbId == dbId) {
      *pCache = dbCache;
      return TSDB_CODE_SUCCESS;
    }
#endif
    CTG_ERR_RET(ctgRemoveDB(pCtg, dbCache, dbFName));
D
dapan1121 已提交
1332
  }
D
dapan1121 已提交
1333 1334
  
  CTG_ERR_RET(ctgAddNewDBCache(pCtg, dbFName, dbId));
D
dapan1121 已提交
1335

D
dapan1121 已提交
1336
  ctgGetDBCache(pCtg, dbFName, &dbCache);
D
dapan1121 已提交
1337

D
dapan1121 已提交
1338
  *pCache = dbCache;
D
dapan1121 已提交
1339

D
dapan1121 已提交
1340
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1341 1342 1343
}


D
dapan1121 已提交
1344 1345 1346
int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo** pDbInfo) {
  int32_t code = 0;
  SDBVgInfo* dbInfo = *pDbInfo;
D
dapan1121 已提交
1347 1348 1349 1350

  if (NULL == dbInfo->vgHash) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1351
  
D
dapan1121 已提交
1352
  if (dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
D
dapan1121 已提交
1353 1354
    ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", 
      dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
D
dapan1121 已提交
1355
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1356 1357
  }

D
dapan1121 已提交
1358
  bool newAdded = false;
D
dapan 已提交
1359
  SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable};
D
dapan1121 已提交
1360 1361 1362 1363 1364 1365 1366 1367 1368 1369

  SCtgDBCache *dbCache = NULL;
  CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  SDBVgInfo *vgInfo = NULL;
  CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache));
D
dapan1121 已提交
1370
  
D
dapan1121 已提交
1371
  if (dbCache->vgInfo) {
D
dapan 已提交
1372 1373 1374 1375 1376 1377 1378 1379 1380
    if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) {
      ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
      ctgWReleaseVgInfo(dbCache);
      
      return TSDB_CODE_SUCCESS;
    }

    if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) {
      ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable);
D
dapan1121 已提交
1381
      ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1382
      
D
dapan1121 已提交
1383
      return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1384
    }
D
dapan1121 已提交
1385 1386

    ctgFreeVgInfo(dbCache->vgInfo);
D
dapan1121 已提交
1387 1388
  }

D
dapan1121 已提交
1389
  dbCache->vgInfo = dbInfo;
D
dapan1121 已提交
1390

D
dapan1121 已提交
1391
  *pDbInfo = NULL;
D
dapan1121 已提交
1392

D
dapan1121 已提交
1393
  ctgDebug("db vgInfo updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId);
D
dapan1121 已提交
1394

D
dapan1121 已提交
1395
  ctgWReleaseVgInfo(dbCache);
D
dapan1121 已提交
1396

D
dapan1121 已提交
1397
  dbCache = NULL;
D
dapan1121 已提交
1398

D
dapan1121 已提交
1399
  strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName));
D
dapan1121 已提交
1400
  CTG_ERR_RET(ctgMetaRentUpdate(&pCtg->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
D
dapan1121 已提交
1401
  
D
dapan1121 已提交
1402 1403 1404 1405
  CTG_RET(code);
}


D
dapan1121 已提交
1406 1407
int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, uint64_t dbId, char *tbName, STableMeta *meta, int32_t metaSize) {
  SCtgTbMetaCache *tbCache = &dbCache->tbCache;
D
dapan1121 已提交
1408

D
dapan1121 已提交
1409 1410 1411 1412 1413
  CTG_LOCK(CTG_READ, &tbCache->metaLock);
  if (dbCache->deleted || NULL == tbCache->metaCache || NULL == tbCache->stbCache) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
    ctgError("db is dropping, dbId:%"PRIx64, dbCache->dbId);
    CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
D
dapan1121 已提交
1414 1415
  }

D
dapan1121 已提交
1416 1417 1418 1419 1420 1421 1422
  int8_t origType = 0;
  uint64_t origSuid = 0;
  bool isStb = meta->tableType == TSDB_SUPER_TABLE;
  STableMeta *orig = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (orig) {
    origType = orig->tableType;
    
D
dapan1121 已提交
1423 1424 1425 1426 1427
    if (origType == TSDB_SUPER_TABLE) {
      if ((!isStb) || orig->suid != meta->suid) {
        CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
        if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) {
          ctgError("stb not exist in stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
D
dapan1121 已提交
1428 1429
        } else {
          CTG_CACHE_STAT_SUB(stblNum, 1);
D
dapan1121 已提交
1430 1431 1432 1433 1434
        }
        CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);

        ctgDebug("stb removed from stbCache, dbFName:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid);
        
D
dapan1121 已提交
1435
        ctgMetaRentRemove(&pCtg->stbRent, orig->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare);
D
dapan1121 已提交
1436
      }
D
dapan1121 已提交
1437

D
dapan1121 已提交
1438
      origSuid = orig->suid;
D
dapan1121 已提交
1439
    }
D
dapan1121 已提交
1440
  }
D
dapan1121 已提交
1441

D
dapan1121 已提交
1442 1443
  if (isStb) {
    CTG_LOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1444
  }
D
dapan1121 已提交
1445
  
D
dapan1121 已提交
1446 1447 1448 1449 1450 1451 1452 1453 1454
  if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
    if (isStb) {
      CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    }
    
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1455

D
dapan1121 已提交
1456 1457 1458 1459
  if (NULL == orig) {
    CTG_CACHE_STAT_ADD(tblNum, 1);
  }

D
dapan 已提交
1460
  ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1461
  ctgdShowTableMeta(pCtg, tbName, meta);
D
dapan 已提交
1462

D
dapan1121 已提交
1463 1464 1465
  if (!isStb) {
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1466
  }
D
dapan1121 已提交
1467

D
dapan1121 已提交
1468
  if (origType == TSDB_SUPER_TABLE && origSuid == meta->suid) {
D
dapan1121 已提交
1469 1470 1471 1472
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);  
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1473

D
dapan1121 已提交
1474 1475 1476 1477
  STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName));
  if (taosHashPut(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES) != 0) {
    CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
    CTG_UNLOCK(CTG_READ, &tbCache->metaLock);    
H
Haojun Liao 已提交
1478
    ctgError("taosHashPut stable to stable cache failed, suid:%"PRIx64, meta->suid);
D
dapan1121 已提交
1479
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
1480
  }
D
dapan1121 已提交
1481 1482

  CTG_CACHE_STAT_ADD(stblNum, 1);
D
dapan1121 已提交
1483 1484
  
  CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock);
D
dapan1121 已提交
1485

D
dapan1121 已提交
1486 1487
  CTG_UNLOCK(CTG_READ, &tbCache->metaLock);

D
dapan 已提交
1488
  ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
D
dapan1121 已提交
1489 1490 1491 1492 1493 1494 1495

  SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
  strcpy(metaRent.dbFName, dbFName);
  strcpy(metaRent.stbName, tbName);
  CTG_ERR_RET(ctgMetaRentAdd(&pCtg->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion)));
  
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1496 1497
}

D
dapan 已提交
1498
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
wafwerar's avatar
wafwerar 已提交
1499
  *dst = taosMemoryMalloc(sizeof(SDBVgInfo));
D
dapan 已提交
1500 1501 1502 1503 1504 1505 1506 1507 1508 1509 1510
  if (NULL == *dst) {
    qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  memcpy(*dst, src, sizeof(SDBVgInfo));

  size_t hashSize = taosHashGetSize(src->vgHash);
  (*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == (*dst)->vgHash) {
    qError("taosHashInit %d failed", (int32_t)hashSize);
wafwerar's avatar
wafwerar 已提交
1511
    taosMemoryFreeClear(*dst);
D
dapan 已提交
1512 1513 1514 1515 1516 1517
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }

  int32_t *vgId = NULL;
  void *pIter = taosHashIterate(src->vgHash, NULL);
  while (pIter) {
H
Haojun Liao 已提交
1518
    vgId = taosHashGetKey(pIter, NULL);
D
dapan 已提交
1519 1520 1521 1522 1523

    if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
      qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
      taosHashCancelIterate(src->vgHash, pIter);
      taosHashCleanup((*dst)->vgHash);
wafwerar's avatar
wafwerar 已提交
1524
      taosMemoryFreeClear(*dst);
D
dapan 已提交
1525 1526 1527 1528 1529 1530 1531 1532 1533 1534 1535 1536
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
    pIter = taosHashIterate(src->vgHash, pIter);
  }


  return TSDB_CODE_SUCCESS;
}



D
dapan1121 已提交
1537
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
D
dapan1121 已提交
1538
  bool inCache = false;
D
dapan1121 已提交
1539
  int32_t code = 0;
D
dapan1121 已提交
1540 1541 1542

  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));

D
dapan1121 已提交
1543
  if (inCache) {
D
dapan1121 已提交
1544
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1545 1546 1547 1548 1549
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

D
dapan1121 已提交
1550
  tstrncpy(input.db, dbFName, tListLen(input.db));
D
dapan1121 已提交
1551
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
1552

D
dapan1121 已提交
1553 1554 1555 1556 1557 1558 1559 1560 1561
  code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut);
  if (code) {
    if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) {
      ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
      ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId);
    }

    CTG_ERR_RET(code);
  }
D
dapan1121 已提交
1562

D
dapan 已提交
1563
  CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
D
dapan1121 已提交
1564

D
dapan1121 已提交
1565
  CTG_ERR_RET(ctgPushUpdateVgMsgInQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false));
D
dapan 已提交
1566

D
dapan1121 已提交
1567
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
1568 1569 1570

_return:

wafwerar's avatar
wafwerar 已提交
1571
  taosMemoryFreeClear(*pInfo);
D
dapan 已提交
1572 1573 1574
  *pInfo = DbOut.dbVgroup;
  
  CTG_RET(code);
D
dapan1121 已提交
1575 1576
}

D
dapan1121 已提交
1577 1578 1579
int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName) {
  bool inCache = false;
  int32_t code = 0;
D
dapan1121 已提交
1580
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
1581

D
dapan1121 已提交
1582
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache, &inCache));
D
dapan1121 已提交
1583 1584 1585 1586 1587 1588

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};
  tstrncpy(input.db, dbFName, tListLen(input.db));

  if (inCache) {
D
dapan1121 已提交
1589
    input.dbId = dbCache->dbId;
D
dapan1121 已提交
1590

D
dapan1121 已提交
1591 1592
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
1593
  }
D
dapan1121 已提交
1594 1595 1596
  
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
  input.numOfTable = 0;
D
dapan1121 已提交
1597 1598 1599

  code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut);
  if (code) {
D
dapan1121 已提交
1600
    if (CTG_DB_NOT_EXIST(code) && inCache) {
D
dapan1121 已提交
1601 1602 1603 1604 1605 1606 1607 1608 1609 1610 1611 1612 1613
      ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
      ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId);
    }

    CTG_ERR_RET(code);
  }

  CTG_ERR_RET(ctgPushUpdateVgMsgInQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true));

  return TSDB_CODE_SUCCESS;
}


D
dapan 已提交
1614

D
dapan1121 已提交
1615
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) {
wafwerar's avatar
wafwerar 已提交
1616
  *pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
D
dapan1121 已提交
1617 1618 1619
  if (NULL == *pOutput) {
    qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan 已提交
1620 1621
  }

D
dapan1121 已提交
1622 1623 1624 1625
  memcpy(*pOutput, output, sizeof(STableMetaOutput));

  if (output->tbMeta) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
wafwerar's avatar
wafwerar 已提交
1626
    (*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
D
dapan1121 已提交
1627
    if (NULL == (*pOutput)->tbMeta) {
D
dapan 已提交
1628
      qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
wafwerar's avatar
wafwerar 已提交
1629
      taosMemoryFreeClear(*pOutput);
D
dapan1121 已提交
1630 1631 1632 1633
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

    memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
D
dapan 已提交
1634 1635
  }

D
dapan1121 已提交
1636 1637 1638
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
1639 1640


D
dapan1121 已提交
1641
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput, bool syncReq) {
D
dapan1121 已提交
1642
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
1643 1644 1645 1646 1647 1648
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

D
dapan1121 已提交
1649
  if (!CTG_FLAG_IS_INF_DB(flag)) {
D
dapan1121 已提交
1650 1651
    CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo));
  }
D
dapan1121 已提交
1652

D
dapan1121 已提交
1653
  STableMetaOutput  moutput = {0};
wafwerar's avatar
wafwerar 已提交
1654
  STableMetaOutput *output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
D
dapan1121 已提交
1655 1656 1657 1658
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
1659

D
dapan1121 已提交
1660
  if (CTG_FLAG_IS_INF_DB(flag)) {
D
dapan1121 已提交
1661 1662 1663
    ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(pTableName));

    CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, (char *)pTableName->dbname, (char *)pTableName->tname, output));
D
dapan1121 已提交
1664
  } else if (CTG_FLAG_IS_STB(flag)) {
D
dapan 已提交
1665
    ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1666 1667

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
1668
    CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTrans, pMgmtEps, pTableName, output));
D
dapan1121 已提交
1669

D
dapan1121 已提交
1670
    if (CTG_IS_META_NULL(output->metaType)) {
D
dapan1121 已提交
1671
      CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1672 1673
    }
  } else {
D
dapan1121 已提交
1674
    ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pTableName), flag);
D
dapan1121 已提交
1675 1676

    // if get from vnode failed or no table meta, will not try mnode
D
dapan1121 已提交
1677
    CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo, output));
D
dapan1121 已提交
1678

D
dapan1121 已提交
1679
    if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
1680
      ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1681

wafwerar's avatar
wafwerar 已提交
1682
      taosMemoryFreeClear(output->tbMeta);
D
dapan1121 已提交
1683
      
D
dapan1121 已提交
1684
      CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, output));
D
dapan1121 已提交
1685
    } else if (CTG_IS_META_BOTH(output->metaType)) {
D
dapan1121 已提交
1686
      int32_t exist = 0;
D
dapan1121 已提交
1687 1688 1689
      if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
        CTG_ERR_JRET(ctgIsTableMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
      }
H
Haojun Liao 已提交
1690

D
dapan1121 已提交
1691
      if (0 == exist) {
D
dapan1121 已提交
1692
        CTG_ERR_JRET(ctgGetTableMetaFromMnodeImpl(pCtg, pTrans, pMgmtEps, output->dbFName, output->tbName, &moutput));
D
dapan1121 已提交
1693

D
dapan1121 已提交
1694
        if (CTG_IS_META_NULL(moutput.metaType)) {
D
dapan1121 已提交
1695
          SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
1696 1697
        }
        
wafwerar's avatar
wafwerar 已提交
1698
        taosMemoryFreeClear(output->tbMeta);
D
dapan1121 已提交
1699
        output->tbMeta = moutput.tbMeta;
D
dapan1121 已提交
1700 1701
        moutput.tbMeta = NULL;
      } else {
wafwerar's avatar
wafwerar 已提交
1702
        taosMemoryFreeClear(output->tbMeta);
D
dapan1121 已提交
1703
        
D
dapan1121 已提交
1704
        SET_META_TYPE_CTABLE(output->metaType); 
D
dapan1121 已提交
1705
      }
D
dapan1121 已提交
1706 1707 1708
    }
  }

D
dapan1121 已提交
1709
  if (CTG_IS_META_NULL(output->metaType)) {
D
dapan 已提交
1710
    ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
D
dapan1121 已提交
1711
    catalogRemoveTableMeta(pCtg, pTableName);
D
dapan1121 已提交
1712 1713 1714
    CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
  }

D
dapan 已提交
1715 1716 1717 1718 1719 1720
  if (CTG_IS_META_TABLE(output->metaType)) {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
  } else {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
  }

D
dapan1121 已提交
1721 1722 1723 1724
  if (pOutput) {
    CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
  }

D
dapan1121 已提交
1725
  CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output, syncReq));
D
dapan 已提交
1726

D
dapan1121 已提交
1727
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
1728 1729 1730

_return:

wafwerar's avatar
wafwerar 已提交
1731 1732
  taosMemoryFreeClear(output->tbMeta);
  taosMemoryFreeClear(output);
D
dapan1121 已提交
1733 1734 1735 1736
  
  CTG_RET(code);
}

D
dapan1121 已提交
1737
int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t flag) {
D
dapan1121 已提交
1738
  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pTableMeta) {
D
dapan1121 已提交
1739 1740 1741
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }
  
D
dapan1121 已提交
1742
  bool inCache = false;
D
dapan1121 已提交
1743
  int32_t code = 0;
D
dapan1121 已提交
1744 1745 1746
  uint64_t dbId = 0;
  uint64_t suid = 0;
  STableMetaOutput *output = NULL;
D
dapan1121 已提交
1747

D
dapan1121 已提交
1748
  if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
1749
    CTG_FLAG_SET_INF_DB(flag);
D
dapan1121 已提交
1750 1751
  }

D
dapan1121 已提交
1752
  CTG_ERR_RET(ctgGetTableMetaFromCache(pCtg, pTableName, pTableMeta, &inCache, flag, &dbId));
D
dapan1121 已提交
1753

D
dapan1121 已提交
1754 1755
  int32_t tbType = 0;

D
dapan1121 已提交
1756
  if (inCache) {
D
dapan1121 已提交
1757 1758
    if (CTG_FLAG_MATCH_STB(flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(flag)) || (CTG_FLAG_IS_INF_DB(flag)))) {
      goto _return;
D
dapan1121 已提交
1759
    }
D
dapan1121 已提交
1760

D
dapan1121 已提交
1761 1762
    tbType = (*pTableMeta)->tableType;
    suid = (*pTableMeta)->suid;
D
dapan1121 已提交
1763

wafwerar's avatar
wafwerar 已提交
1764
    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
1765
  }
D
dapan1121 已提交
1766

D
dapan1121 已提交
1767 1768
  if (CTG_FLAG_IS_UNKNOWN_STB(flag)) {
    CTG_FLAG_SET_STB(flag, tbType);
D
dapan1121 已提交
1769 1770
  }

D
dapan1121 已提交
1771

D
dapan 已提交
1772
  while (true) {
D
dapan1121 已提交
1773
    CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output, false));
D
dapan1121 已提交
1774

D
dapan 已提交
1775 1776 1777 1778
    if (CTG_IS_META_TABLE(output->metaType)) {
      *pTableMeta = output->tbMeta;
      goto _return;
    }
D
dapan1121 已提交
1779

D
dapan 已提交
1780 1781 1782 1783 1784 1785
    if (CTG_IS_META_BOTH(output->metaType)) {
      memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
      
      *pTableMeta = output->tbMeta;
      goto _return;
    }
D
dapan1121 已提交
1786

D
dapan 已提交
1787 1788
    if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) {
      ctgError("invalid metaType:%d", output->metaType);
wafwerar's avatar
wafwerar 已提交
1789
      taosMemoryFreeClear(output->tbMeta);
D
dapan 已提交
1790 1791
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }
D
dapan1121 已提交
1792

D
dapan 已提交
1793
    // HANDLE ONLY CHILD TABLE META
D
dapan1121 已提交
1794

D
dapan 已提交
1795 1796 1797
    SName stbName = *pTableName;
    strcpy(stbName.tname, output->tbName);
    
D
dapan1121 已提交
1798 1799
    CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, &stbName, pTableMeta, &inCache, flag, NULL));
    if (!inCache) {
D
dapan 已提交
1800 1801 1802
      ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, pTableName->tname);
      continue;
    }
D
dapan1121 已提交
1803

D
dapan 已提交
1804 1805 1806 1807
    memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));

    break;
  }
D
dapan1121 已提交
1808 1809 1810

_return:

D
dapan1121 已提交
1811
  if (CTG_TABLE_NOT_EXIST(code) && inCache) {
D
dapan1121 已提交
1812 1813 1814 1815 1816 1817 1818 1819
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
    if (CTG_FLAG_IS_INF_DB(flag)) {
      strcpy(dbFName, pTableName->dbname);
    } else {
      tNameGetFullDbName(pTableName, dbFName);
    }

    if (TSDB_SUPER_TABLE == tbType) {
D
dapan1121 已提交
1820
      ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid, false);
D
dapan1121 已提交
1821
    } else {
D
dapan1121 已提交
1822
      ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, false);
D
dapan1121 已提交
1823 1824 1825
    }
  }

wafwerar's avatar
wafwerar 已提交
1826
  taosMemoryFreeClear(output);
D
dapan1121 已提交
1827

D
dapan 已提交
1828 1829
  if (*pTableMeta) {
    ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType);
D
dapan1121 已提交
1830
    ctgdShowTableMeta(pCtg, pTableName->tname, *pTableMeta);
D
dapan 已提交
1831 1832
  }

D
dapan1121 已提交
1833 1834 1835 1836 1837 1838 1839 1840 1841 1842 1843 1844 1845
  CTG_RET(code);
}



int32_t ctgActUpdateVg(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateVgMsg *msg = action->data;
  
  CTG_ERR_JRET(ctgUpdateDBVgInfo(msg->pCtg, msg->dbFName, msg->dbId, &msg->dbInfo));

_return:

D
dapan1121 已提交
1846
  ctgFreeVgInfo(msg->dbInfo);
wafwerar's avatar
wafwerar 已提交
1847
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
1848 1849 1850 1851 1852 1853 1854 1855 1856 1857
  
  CTG_RET(code);
}

int32_t ctgActRemoveDB(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveDBMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
D
dapan 已提交
1858
  ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
D
dapan1121 已提交
1859 1860 1861 1862 1863 1864 1865 1866 1867 1868 1869 1870 1871
  if (NULL == dbCache) {
    goto _return;
  }
  
  if (dbCache->dbId != msg->dbId) {
    ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
    goto _return;
  }
  
  CTG_ERR_JRET(ctgRemoveDB(pCtg, dbCache, msg->dbFName));

_return:

wafwerar's avatar
wafwerar 已提交
1872
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885 1886
  
  CTG_RET(code);
}


int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgUpdateTblMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;
  STableMetaOutput* output = msg->output;
  SCtgDBCache *dbCache = NULL;

  if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
    ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
D
dapan 已提交
1887
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1888 1889 1890 1891 1892 1893
  }

  if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
    ctgError("table type error, expected:%d, actual:%d", TSDB_SUPER_TABLE, output->tbMeta->tableType);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }    
D
dapan1121 已提交
1894
  
D
dapan1121 已提交
1895 1896 1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912
  CTG_ERR_JRET(ctgGetAddDBCache(pCtg, output->dbFName, output->dbId, &dbCache));
  if (NULL == dbCache) {
    ctgInfo("conflict db update, ignore this update, dbFName:%s, dbId:%"PRIx64, output->dbFName, output->dbId);
    CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  if (CTG_IS_META_TABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->tbName, output->tbMeta, metaSize));
  }

  if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
    CTG_ERR_JRET(ctgUpdateTblMeta(pCtg, dbCache, output->dbFName, output->dbId, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
  }

_return:

D
dapan 已提交
1913
  if (output) {
wafwerar's avatar
wafwerar 已提交
1914 1915
    taosMemoryFreeClear(output->tbMeta);
    taosMemoryFreeClear(output);
D
dapan1121 已提交
1916
  }
D
dapan 已提交
1917
  
wafwerar's avatar
wafwerar 已提交
1918
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
1919 1920 1921 1922 1923 1924 1925 1926 1927 1928 1929 1930 1931 1932 1933 1934
  
  CTG_RET(code);
}


int32_t ctgActRemoveStb(SCtgMetaAction *action) {
  int32_t code = 0;
  SCtgRemoveStbMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan 已提交
1935
  if (msg->dbId && (dbCache->dbId != msg->dbId)) {
D
dapan1121 已提交
1936 1937 1938 1939 1940 1941 1942
    ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", stb:%s, suid:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
    return TSDB_CODE_SUCCESS;
  }
  
  CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  if (taosHashRemove(dbCache->tbCache.stbCache, &msg->suid, sizeof(msg->suid))) {
    ctgDebug("stb not exist in stbCache, may be removed, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1943 1944
  } else {
    CTG_CACHE_STAT_SUB(stblNum, 1);
D
dapan1121 已提交
1945 1946 1947 1948 1949
  }

  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
  if (taosHashRemove(dbCache->tbCache.metaCache, msg->stbName, strlen(msg->stbName))) {  
    ctgError("stb not exist in cache, dbFName:%s, stb:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
D
dapan1121 已提交
1950 1951 1952
  } else {
    CTG_CACHE_STAT_SUB(tblNum, 1);
  }
D
dapan1121 已提交
1953 1954 1955 1956 1957 1958
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
  
  CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
  
  ctgInfo("stb removed from cache, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);

D
dapan1121 已提交
1959
  CTG_ERR_JRET(ctgMetaRentRemove(&msg->pCtg->stbRent, msg->suid, ctgStbVersionSortCompare, ctgStbVersionSearchCompare));
D
dapan1121 已提交
1960 1961 1962 1963 1964
  
  ctgDebug("stb removed from rent, dbFName:%s, stbName:%s, suid:%"PRIx64, msg->dbFName, msg->stbName, msg->suid);
  
_return:

wafwerar's avatar
wafwerar 已提交
1965
  taosMemoryFreeClear(msg);
D
dapan1121 已提交
1966 1967 1968 1969 1970
  
  CTG_RET(code);
}

int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
D
dapan1121 已提交
1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984
  int32_t code = 0;
  SCtgRemoveTblMsg *msg = action->data;
  SCatalog* pCtg = msg->pCtg;

  SCtgDBCache *dbCache = NULL;
  ctgGetDBCache(pCtg, msg->dbFName, &dbCache);
  if (NULL == dbCache) {
    return TSDB_CODE_SUCCESS;
  }

  if (dbCache->dbId != msg->dbId) {
    ctgDebug("dbId already modified, dbFName:%s, current:%"PRIx64", dbId:%"PRIx64", tbName:%s", msg->dbFName, dbCache->dbId, msg->dbId, msg->tbName);
    return TSDB_CODE_SUCCESS;
  }
H
Haojun Liao 已提交
1985

D
dapan1121 已提交
1986
  CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock);
H
Haojun Liao 已提交
1987
  if (taosHashRemove(dbCache->tbCache.metaCache, msg->tbName, strlen(msg->tbName))) {
D
dapan1121 已提交
1988 1989 1990
    CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
    ctgError("stb not exist in cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
1991 1992
  } else {
    CTG_CACHE_STAT_SUB(tblNum, 1);
H
Haojun Liao 已提交
1993
  }
D
dapan1121 已提交
1994
  CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
D
dapan1121 已提交
1995

D
dapan1121 已提交
1996
  ctgInfo("table removed from cache, dbFName:%s, tbName:%s", msg->dbFName, msg->tbName);
H
Haojun Liao 已提交
1997

D
dapan1121 已提交
1998
_return:
D
dapan1121 已提交
1999

wafwerar's avatar
wafwerar 已提交
2000
  taosMemoryFreeClear(msg);
H
Haojun Liao 已提交
2001

D
dapan1121 已提交
2002
  CTG_RET(code);
D
dapan1121 已提交
2003 2004 2005 2006 2007 2008 2009 2010
}


void* ctgUpdateThreadFunc(void* param) {
  setThreadName("catalog");

  qInfo("catalog update thread started");

D
dapan 已提交
2011
  CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
2012 2013
  
  while (true) {
D
dapan1121 已提交
2014 2015 2016
    if (tsem_wait(&gCtgMgmt.queue.reqSem)) {
      qError("ctg tsem_wait failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    }
D
dapan1121 已提交
2017
    
wafwerar's avatar
wafwerar 已提交
2018
    if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
D
dapan1121 已提交
2019
      tsem_post(&gCtgMgmt.queue.rspSem);
D
dapan1121 已提交
2020 2021 2022 2023 2024
      break;
    }

    SCtgMetaAction *action = NULL;
    ctgPopAction(&action);
D
dapan1121 已提交
2025
    SCatalog *pCtg = ((SCtgUpdateMsgHeader *)action->data)->pCtg;
D
dapan1121 已提交
2026

D
dapan1121 已提交
2027
    ctgDebug("process [%s] action", gCtgAction[action->act].name);
D
dapan 已提交
2028 2029 2030
    
    (*gCtgAction[action->act].func)(action);

D
dapan1121 已提交
2031 2032 2033 2034 2035 2036
    gCtgMgmt.queue.seqDone = action->seqId;

    if (action->syncReq) {
      tsem_post(&gCtgMgmt.queue.rspSem);
    }

D
dapan1121 已提交
2037
    CTG_RUNTIME_STAT_ADD(qDoneNum, 1); 
D
dapan1121 已提交
2038

D
dapan1121 已提交
2039
    ctgdShowClusterCache(pCtg);
D
dapan1121 已提交
2040 2041
  }

D
dapan 已提交
2042
  CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
D
dapan1121 已提交
2043 2044 2045 2046 2047 2048 2049 2050

  qInfo("catalog update thread stopped");
  
  return NULL;
}


int32_t ctgStartUpdateThread() {
wafwerar's avatar
wafwerar 已提交
2051 2052 2053
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
D
dapan1121 已提交
2054

wafwerar's avatar
wafwerar 已提交
2055
  if (taosThreadCreate(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
D
dapan1121 已提交
2056 2057
    terrno = TAOS_SYSTEM_ERROR(errno);
    CTG_ERR_RET(terrno);
D
dapan1121 已提交
2058 2059
  }
  
wafwerar's avatar
wafwerar 已提交
2060
  taosThreadAttrDestroy(&thAttr);
D
dapan1121 已提交
2061 2062 2063
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
2064 2065 2066 2067 2068 2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079
int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
  SCtgDBCache* dbCache = NULL;
  SArray *vgList = NULL;
  SDBVgInfo *vgInfo = NULL;

  *pVgList = NULL;
  
  CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB));

  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

  SHashObj *vgHash = NULL;  
D
dapan1121 已提交
2080
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, &dbCache, &vgInfo));
D
dapan1121 已提交
2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124

  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
  }

  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
    CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
  } else {
    // USE HASH METHOD INSTEAD OF VGID IN TBMETA
    ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType);
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
    
#if 0  
    int32_t vgId = tbMeta->vgId;
    if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) {
      ctgWarn("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
      CTG_ERR_JRET(TSDB_CODE_CTG_VG_META_MISMATCH);
    }

    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    *pVgList = vgList;
    vgList = NULL;
#endif    
  }

_return:

  if (dbCache) {
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

wafwerar's avatar
wafwerar 已提交
2125
  taosMemoryFreeClear(tbMeta);
D
dapan1121 已提交
2126 2127 2128

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
wafwerar's avatar
wafwerar 已提交
2129
    taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
2130 2131 2132 2133 2134 2135 2136 2137 2138 2139
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);
}

D
dapan1121 已提交
2140

D
dapan1121 已提交
2141
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan 已提交
2142
  if (gCtgMgmt.pCluster) {
D
dapan 已提交
2143
    qError("catalog already initialized");
D
dapan1121 已提交
2144
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2145 2146
  }

wafwerar's avatar
wafwerar 已提交
2147
  atomic_store_8((int8_t*)&gCtgMgmt.exit, false);
D
dapan1121 已提交
2148

D
dapan1121 已提交
2149
  if (cfg) {
D
dapan 已提交
2150
    memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
2151

D
dapan 已提交
2152 2153
    if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
      gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
D
dapan1121 已提交
2154 2155
    }

D
dapan 已提交
2156 2157
    if (gCtgMgmt.cfg.maxTblCacheNum == 0) {
      gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
D
dapan1121 已提交
2158
    }
D
dapan1121 已提交
2159

D
dapan 已提交
2160 2161
    if (gCtgMgmt.cfg.dbRentSec == 0) {
      gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
2162 2163
    }

D
dapan 已提交
2164 2165
    if (gCtgMgmt.cfg.stbRentSec == 0) {
      gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
2166
    }
D
dapan1121 已提交
2167
  } else {
D
dapan 已提交
2168 2169 2170 2171
    gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
    gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
2172 2173
  }

D
dapan 已提交
2174 2175
  gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == gCtgMgmt.pCluster) {
D
dapan1121 已提交
2176 2177
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
2178 2179
  }

D
dapan1121 已提交
2180 2181 2182 2183 2184 2185 2186 2187 2188
  if (tsem_init(&gCtgMgmt.queue.reqSem, 0, 0)) {
    qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
  }
  
  if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
    qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
  }
D
dapan1121 已提交
2189

wafwerar's avatar
wafwerar 已提交
2190
  gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
D
dapan1121 已提交
2191
  if (NULL == gCtgMgmt.queue.head) {
D
dapan1121 已提交
2192 2193 2194
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
2195
  gCtgMgmt.queue.tail = gCtgMgmt.queue.head;
D
dapan1121 已提交
2196

D
dapan1121 已提交
2197 2198
  CTG_ERR_RET(ctgStartUpdateThread());

D
dapan 已提交
2199
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
D
dapan1121 已提交
2200

D
dapan 已提交
2201
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2202 2203
}

D
dapan1121 已提交
2204
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
2205
  if (NULL == catalogHandle) {
D
dapan1121 已提交
2206
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
2207 2208
  }

D
dapan 已提交
2209
  if (NULL == gCtgMgmt.pCluster) {
D
dapan 已提交
2210
    qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
2211
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
2212 2213
  }

D
dapan1121 已提交
2214 2215
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
2216

D
dapan1121 已提交
2217
  while (true) {
D
dapan 已提交
2218
    SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
2219

D
dapan1121 已提交
2220 2221 2222 2223 2224
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
2225

wafwerar's avatar
wafwerar 已提交
2226
    clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog));
D
dapan1121 已提交
2227 2228 2229 2230 2231
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan1121 已提交
2232 2233
    clusterCtg->clusterId = clusterId;

D
dapan 已提交
2234 2235
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
D
dapan1121 已提交
2236

D
dapan 已提交
2237
    clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
2238 2239 2240 2241 2242
    if (NULL == clusterCtg->dbCache) {
      qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan 已提交
2243
    SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
D
dapan1121 已提交
2244
    if (NULL == metaCache) {
D
dapan 已提交
2245
      qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum);
D
dapan1121 已提交
2246 2247 2248
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }
    
D
dapan 已提交
2249
    code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
D
dapan1121 已提交
2250 2251 2252 2253 2254 2255 2256 2257 2258 2259 2260 2261 2262
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
2263
  }
D
dapan1121 已提交
2264 2265

  *catalogHandle = clusterCtg;
D
dapan1121 已提交
2266 2267

  CTG_CACHE_STAT_ADD(clusterNum, 1);
D
dapan 已提交
2268
  
D
dapan1121 已提交
2269
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
2270 2271 2272 2273 2274 2275 2276 2277

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

D
dapan1121 已提交
2278 2279
void catalogFreeHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
2280 2281
    return;
  }
D
dapan1121 已提交
2282

D
dapan 已提交
2283
  if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
D
dapan1121 已提交
2284
    ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
D
dapan1121 已提交
2285 2286 2287
    return;
  }

D
dapan1121 已提交
2288 2289
  CTG_CACHE_STAT_SUB(clusterNum, 1);

D
dapan1121 已提交
2290
  uint64_t clusterId = pCtg->clusterId;
D
dapan1121 已提交
2291
  
D
dapan1121 已提交
2292
  ctgFreeHandle(pCtg);
D
dapan1121 已提交
2293
  
D
dapan1121 已提交
2294
  ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
D
dapan 已提交
2295 2296
}

D
dapan1121 已提交
2297
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) {
D
dapan1121 已提交
2298 2299
  CTG_API_ENTER();

D
dapan1121 已提交
2300
  if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
D
dapan1121 已提交
2301 2302 2303 2304
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SCtgDBCache *dbCache = NULL;
D
dapan 已提交
2305
  bool inCache = false;
D
dapan1121 已提交
2306
  int32_t code = 0;
D
dapan1121 已提交
2307

D
dapan1121 已提交
2308 2309
  CTG_ERR_JRET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache, &inCache));
  if (!inCache) {
D
dapan1121 已提交
2310
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
2311
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2312 2313
  }

D
dapan1121 已提交
2314
  *version = dbCache->vgInfo->vgVersion;
D
dapan1121 已提交
2315
  *dbId = dbCache->dbId;
D
dapan1121 已提交
2316
  *tableNum = dbCache->vgInfo->numOfTable;
D
dapan1121 已提交
2317 2318 2319

  ctgReleaseVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
2320

D
dapan1121 已提交
2321
  ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
D
dapan1121 已提交
2322

D
dapan1121 已提交
2323
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2324 2325 2326 2327

_return:

  CTG_API_LEAVE(code);
D
dapan1121 已提交
2328 2329
}

D
dapan1121 已提交
2330
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) {
D
dapan1121 已提交
2331 2332
  CTG_API_ENTER();

D
dapan1121 已提交
2333 2334 2335
  if (NULL == pCtg || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
2336

D
dapan1121 已提交
2337
  SCtgDBCache* dbCache = NULL;
2338
  int32_t code = 0;
D
dapan1121 已提交
2339
  SArray *vgList = NULL;
D
dapan1121 已提交
2340 2341
  SHashObj *vgHash = NULL;
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
2342
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName, &dbCache, &vgInfo));
D
dapan1121 已提交
2343 2344 2345 2346
  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
D
dapan1121 已提交
2347 2348
  }

D
dapan1121 已提交
2349
  CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
D
dapan1121 已提交
2350 2351 2352 2353 2354

  *vgroupList = vgList;
  vgList = NULL;

_return:
D
dapan1121 已提交
2355 2356

  if (dbCache) {
D
dapan1121 已提交
2357 2358
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
2359 2360
  }

D
dapan1121 已提交
2361 2362
  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
wafwerar's avatar
wafwerar 已提交
2363
    taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
2364 2365
  }

D
dapan1121 已提交
2366
  CTG_API_LEAVE(code);  
D
dapan1121 已提交
2367 2368 2369
}


D
dapan1121 已提交
2370
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
D
dapan1121 已提交
2371
  CTG_API_ENTER();
D
dapan1121 已提交
2372 2373

  int32_t code = 0;
D
dapan1121 已提交
2374
  
D
dapan1121 已提交
2375
  if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
D
dapan1121 已提交
2376
    ctgFreeVgInfo(dbInfo);
D
dapan1121 已提交
2377 2378 2379
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2380
  code = ctgPushUpdateVgMsgInQueue(pCtg, dbFName, dbId, dbInfo, false);
D
dapan1121 已提交
2381

D
dapan1121 已提交
2382 2383
_return:

D
dapan1121 已提交
2384
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2385 2386 2387
}


D
dapan1121 已提交
2388 2389 2390
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
  CTG_API_ENTER();

D
dapan1121 已提交
2391 2392
  int32_t code = 0;
  
D
dapan1121 已提交
2393 2394
  if (NULL == pCtg || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2395 2396
  }

D
dapan1121 已提交
2397
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2398
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2399
  }
D
dapan1121 已提交
2400

D
dapan1121 已提交
2401
  CTG_ERR_JRET(ctgPushRmDBMsgInQueue(pCtg, dbFName, dbId));
D
dapan 已提交
2402

D
dapan1121 已提交
2403
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2404
  
D
dapan1121 已提交
2405 2406
_return:

D
dapan1121 已提交
2407
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2408 2409
}

D
dapan1121 已提交
2410 2411 2412
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {

}
D
dapan1121 已提交
2413

D
dapan1121 已提交
2414
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
D
dapan 已提交
2415 2416 2417 2418 2419 2420 2421 2422 2423 2424 2425 2426 2427
  CTG_API_ENTER();

  int32_t code = 0;
  
  if (NULL == pCtg || NULL == pTableName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == pCtg->dbCache) {
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
  }

  STableMeta *tblMeta = NULL;
D
dapan1121 已提交
2428
  bool inCache = false;
D
dapan 已提交
2429
  uint64_t dbId = 0;
D
dapan1121 已提交
2430
  CTG_ERR_JRET(ctgGetTableMetaFromCache(pCtg, pTableName, &tblMeta, &inCache, 0, &dbId));
D
dapan 已提交
2431

D
dapan1121 已提交
2432
  if (!inCache) {
D
dapan 已提交
2433 2434 2435 2436 2437 2438 2439 2440
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
    goto _return;
  }

  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
  
  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
D
dapan1121 已提交
2441
    CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid, true));
D
dapan 已提交
2442
  } else {
D
dapan1121 已提交
2443
    CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
D
dapan 已提交
2444 2445 2446 2447 2448
  }

 
_return:

wafwerar's avatar
wafwerar 已提交
2449
  taosMemoryFreeClear(tblMeta);
D
dapan 已提交
2450 2451 2452 2453 2454

  CTG_API_LEAVE(code);
}


D
dapan1121 已提交
2455 2456 2457
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
  CTG_API_ENTER();

D
dapan 已提交
2458 2459
  int32_t code = 0;
  
D
dapan1121 已提交
2460 2461
  if (NULL == pCtg || NULL == dbFName || NULL == stbName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
2462 2463
  }

D
dapan1121 已提交
2464
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
2465
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2466
  }
D
dapan1121 已提交
2467

D
dapan1121 已提交
2468
  CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid, true));
D
dapan 已提交
2469

D
dapan1121 已提交
2470
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2471
  
D
dapan1121 已提交
2472 2473
_return:

D
dapan1121 已提交
2474
  CTG_API_LEAVE(code);
D
dapan 已提交
2475 2476
}

D
dapan1121 已提交
2477 2478 2479
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, const char *pIndexName, SIndexMeta** pIndexMeta) {

}
D
dapan1121 已提交
2480

D
dapan1121 已提交
2481
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2482 2483
  CTG_API_ENTER();

D
dapan1121 已提交
2484
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_UNKNOWN_STB));
D
dapan1121 已提交
2485
}
D
dapan1121 已提交
2486

D
dapan1121 已提交
2487
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
2488 2489
  CTG_API_ENTER();

D
dapan1121 已提交
2490
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_STB));
D
dapan1121 已提交
2491 2492
}

D
dapan1121 已提交
2493 2494 2495 2496 2497 2498 2499
int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == rspMsg) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

wafwerar's avatar
wafwerar 已提交
2500
  STableMetaOutput *output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
D
dapan1121 已提交
2501 2502 2503 2504 2505
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
2506 2507
  int32_t code = 0;

D
dapan1121 已提交
2508 2509
  strcpy(output->dbFName, rspMsg->dbFName);
  strcpy(output->tbName, rspMsg->tbName);
D
dapan1121 已提交
2510

D
dapan1121 已提交
2511
  output->dbId = rspMsg->dbId;
D
dapan1121 已提交
2512
  
D
dapan1121 已提交
2513
  SET_META_TYPE_TABLE(output->metaType);
D
dapan1121 已提交
2514
  
D
dapan1121 已提交
2515
  CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
D
dapan1121 已提交
2516

D
dapan1121 已提交
2517
  CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output, false));
D
dapan 已提交
2518

D
dapan1121 已提交
2519 2520
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
2521 2522
_return:

wafwerar's avatar
wafwerar 已提交
2523 2524
  taosMemoryFreeClear(output->tbMeta);
  taosMemoryFreeClear(output);
D
dapan1121 已提交
2525
  
D
dapan1121 已提交
2526
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2527 2528
}

D
dapan1121 已提交
2529 2530 2531 2532 2533 2534 2535 2536 2537
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName));
}
D
dapan1121 已提交
2538

D
dapan1121 已提交
2539
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
D
dapan1121 已提交
2540 2541
  CTG_API_ENTER();

D
dapan1121 已提交
2542
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
2543 2544 2545
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2546
  CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, true));
2547
}
2548

D
dapan1121 已提交
2549
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
2550 2551
  CTG_API_ENTER();

D
dapan1121 已提交
2552
  CTG_API_LEAVE(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, pTableName, pTableMeta, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable)));
D
dapan1121 已提交
2553 2554
}

D
dapan1121 已提交
2555
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
D
dapan1121 已提交
2556
  CTG_API_ENTER();
D
dapan1121 已提交
2557 2558 2559 2560

  if (NULL == pCtg || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2561 2562 2563 2564 2565

  if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2566

D
dapan1121 已提交
2567
  int32_t code = 0;
D
dapan1121 已提交
2568

D
dapan1121 已提交
2569 2570 2571 2572 2573
  while (true) {
    code = ctgGetTableDistVgInfo(pCtg, pRpc, pMgmtEps, pTableName, pVgList);
    if (code) {
      if (TSDB_CODE_CTG_VG_META_MISMATCH == code) {
        CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(CTG_FLAG_UNKNOWN_STB), NULL, true));
D
dapan 已提交
2574

D
dapan1121 已提交
2575 2576 2577 2578 2579 2580
        char dbFName[TSDB_DB_FNAME_LEN] = {0};
        tNameGetFullDbName(pTableName, dbFName);        
        CTG_ERR_JRET(ctgRefreshDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName));
        
        continue;
      }
D
dapan1121 已提交
2581
    }
D
dapan 已提交
2582

D
dapan1121 已提交
2583
    break;
D
dapan1121 已提交
2584
  }
D
dapan 已提交
2585

D
dapan1121 已提交
2586
_return:
D
dapan 已提交
2587

D
dapan1121 已提交
2588
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2589 2590 2591
}


D
dapan1121 已提交
2592
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
2593 2594
  CTG_API_ENTER();

D
dapan1121 已提交
2595 2596 2597 2598 2599
  if (CTG_IS_INF_DBNAME(pTableName->dbname)) {
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2600 2601
  SCtgDBCache* dbCache = NULL;
  int32_t code = 0;
H
Haojun Liao 已提交
2602 2603
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
2604

D
dapan1121 已提交
2605
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
2606
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo));
D
dapan1121 已提交
2607

D
dapan1121 已提交
2608
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
D
dapan1121 已提交
2609

D
dapan1121 已提交
2610
_return:
D
dapan1121 已提交
2611

D
dapan1121 已提交
2612
  if (dbCache) {
D
dapan1121 已提交
2613 2614 2615 2616 2617 2618
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
wafwerar's avatar
wafwerar 已提交
2619
    taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
2620
  }
D
dapan1121 已提交
2621

D
dapan1121 已提交
2622
  CTG_API_LEAVE(code);
D
dapan1121 已提交
2623 2624 2625
}


D
dapan1121 已提交
2626
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
D
dapan1121 已提交
2627 2628
  CTG_API_ENTER();

D
dapan1121 已提交
2629
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
2630 2631 2632
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2633
  int32_t code = 0;
D
dapan1121 已提交
2634
  pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2635 2636 2637

  if (pReq->pTableName) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName);
D
dapan1121 已提交
2638
    if (tbNum <= 0) {
D
dapan1121 已提交
2639
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
2640
      CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2641
    }
H
Haojun Liao 已提交
2642

D
dapan1121 已提交
2643 2644
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
2645
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
2646
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
2647 2648 2649 2650 2651 2652
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
      SName *name = taosArrayGet(pReq->pTableName, i);
      STableMeta *pTableMeta = NULL;
      
D
dapan1121 已提交
2653
      CTG_ERR_JRET(ctgGetTableMeta(pCtg, pTrans, pMgmtEps, name, &pTableMeta, CTG_FLAG_UNKNOWN_STB));
D
dapan1121 已提交
2654 2655 2656

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
2657
        taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
2658 2659 2660 2661 2662
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

D
dapan1121 已提交
2663 2664 2665 2666
  if (pReq->qNodeRequired) {
    CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pTrans, pMgmtEps, &pRsp->pEpSetList));
  }

D
dapan1121 已提交
2667
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
2668 2669

_return:  
D
dapan1121 已提交
2670

D
dapan1121 已提交
2671 2672 2673 2674
  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
wafwerar's avatar
wafwerar 已提交
2675
      taosMemoryFreeClear(pMeta);
D
dapan1121 已提交
2676 2677 2678
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
2679
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
2680
  }
D
dapan 已提交
2681
  
D
dapan1121 已提交
2682
  CTG_API_LEAVE(code);
2683
}
D
dapan 已提交
2684

D
dapan1121 已提交
2685
int32_t catalogGetQnodeList(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) {
D
dapan1121 已提交
2686
  CTG_API_ENTER();
D
dapan1121 已提交
2687 2688
  
  int32_t code = 0;
D
dapan1121 已提交
2689 2690 2691 2692
  if (NULL == pCtg || NULL == pRpc  || NULL == pMgmtEps || NULL == pQnodeList) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
2693 2694 2695
  CTG_ERR_JRET(ctgGetQnodeListFromMnode(pCtg, pRpc, pMgmtEps, &pQnodeList));

_return:
D
dapan 已提交
2696

D
dapan1121 已提交
2697
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
2698 2699
}

D
dapan1121 已提交
2700
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) {
D
dapan1121 已提交
2701 2702
  CTG_API_ENTER();

D
dapan1121 已提交
2703 2704
  if (NULL == pCtg || NULL == stables || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
2705 2706
  }

D
dapan1121 已提交
2707 2708 2709 2710
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) {
D
dapan1121 已提交
2711
  CTG_API_ENTER();
D
dapan1121 已提交
2712 2713 2714 2715
  
  if (NULL == pCtg || NULL == dbs || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
2716

D
dapan1121 已提交
2717
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
D
dapan1121 已提交
2718 2719
}

D
dapan 已提交
2720

D
dapan 已提交
2721
void catalogDestroy(void) {
D
dapan1121 已提交
2722 2723
  qInfo("start to destroy catalog");
  
wafwerar's avatar
wafwerar 已提交
2724
  if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
D
dapan1121 已提交
2725 2726 2727
    return;
  }

wafwerar's avatar
wafwerar 已提交
2728
  atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
D
dapan 已提交
2729

D
dapan1121 已提交
2730 2731 2732 2733 2734 2735 2736
  if (tsem_post(&gCtgMgmt.queue.reqSem)) {
    qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
  }
  
  if (tsem_post(&gCtgMgmt.queue.rspSem)) {
    qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
  }
D
dapan1121 已提交
2737

D
dapan1121 已提交
2738
  while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
wafwerar's avatar
wafwerar 已提交
2739
    taosUsleep(1);
D
dapan1121 已提交
2740 2741
  }
  
D
dapan 已提交
2742
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2743

D
dapan1121 已提交
2744
  SCatalog *pCtg = NULL;
D
dapan 已提交
2745
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
2746
  while (pIter) {
D
dapan1121 已提交
2747
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
2748

D
dapan1121 已提交
2749 2750
    if (pCtg) {
      catalogFreeHandle(pCtg);
D
dapan1121 已提交
2751 2752
    }
    
D
dapan 已提交
2753
    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
D
dapan 已提交
2754
  }
D
dapan1121 已提交
2755
  
D
dapan 已提交
2756 2757
  taosHashCleanup(gCtgMgmt.pCluster);
  gCtgMgmt.pCluster = NULL;
D
dapan1121 已提交
2758

D
dapan 已提交
2759
  CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
2760

D
dapan1121 已提交
2761
  qInfo("catalog destroyed");
D
dapan 已提交
2762 2763 2764 2765
}