catalog.c 33.6 KB
Newer Older
H
Hongze Cheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
H
Haojun Liao 已提交
14 15
 */

D
dapan1121 已提交
16
#include "trpc.h"
D
dapan1121 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "catalogInt.h"
20
#include "systable.h"
D
dapan1121 已提交
21
#include "tref.h"
22

D
dapan 已提交
23
SCatalogMgmt gCtgMgmt = {0};
D
dapan1121 已提交
24

D
dapan1121 已提交
25

D
dapan1121 已提交
26 27 28 29 30 31 32 33
int32_t ctgRemoveTbMetaFromCache(SCatalog* pCtg, SName* pTableName, bool syncReq) {
  int32_t code = 0;
  STableMeta *tblMeta = NULL;
  SCtgTbMetaCtx tbCtx = {0};
  tbCtx.flag = CTG_FLAG_UNKNOWN_STB;
  tbCtx.pName = pTableName;
  
  CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &tbCtx, &tblMeta));
D
dapan 已提交
34

D
dapan1121 已提交
35 36
  if (NULL == tblMeta) {
    ctgDebug("table already not in cache, db:%s, tblName:%s", pTableName->dbname, pTableName->tname);
D
dapan1121 已提交
37
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
38
  }
D
dapan1121 已提交
39

D
dapan1121 已提交
40 41
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
42
  
D
dapan1121 已提交
43 44 45 46
  if (TSDB_SUPER_TABLE == tblMeta->tableType) {
    CTG_ERR_JRET(ctgPutRmStbToQueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, tblMeta->suid, syncReq));
  } else {
    CTG_ERR_JRET(ctgPutRmTbToQueue(pCtg, dbFName, tbCtx.tbInfo.dbId, pTableName->tname, syncReq));
D
dapan 已提交
47
  }
D
dapan1121 已提交
48 49
 
_return:
D
dapan 已提交
50

D
dapan1121 已提交
51
  taosMemoryFreeClear(tblMeta);
D
dapan 已提交
52

D
dapan1121 已提交
53
  CTG_RET(code);
D
dapan 已提交
54 55
}

D
dapan1121 已提交
56
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
D
dapan1121 已提交
57
  int32_t code = 0;
D
dapan1121 已提交
58

D
dapan1121 已提交
59
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache));
D
dapan1121 已提交
60

D
dapan1121 已提交
61
  if (*dbCache) {
D
dapan1121 已提交
62
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
63 64 65 66 67
  }

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};

D
dapan1121 已提交
68
  tstrncpy(input.db, dbFName, tListLen(input.db));
D
dapan1121 已提交
69
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
H
Haojun Liao 已提交
70

D
dapan1121 已提交
71
  CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pTrans, pMgmtEps, &input, &DbOut, NULL));
D
dapan1121 已提交
72

D
dapan 已提交
73
  CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
D
dapan1121 已提交
74

D
dapan1121 已提交
75
  CTG_ERR_RET(ctgPutUpdateVgToQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, false));
D
dapan 已提交
76

D
dapan1121 已提交
77
  return TSDB_CODE_SUCCESS;
D
dapan 已提交
78 79 80

_return:

wafwerar's avatar
wafwerar 已提交
81
  taosMemoryFreeClear(*pInfo);
D
dapan 已提交
82 83 84
  *pInfo = DbOut.dbVgroup;
  
  CTG_RET(code);
D
dapan1121 已提交
85 86
}

D
dapan1121 已提交
87
int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) {
D
dapan1121 已提交
88
  int32_t code = 0;
D
dapan1121 已提交
89
  SCtgDBCache* dbCache = NULL;
D
dapan1121 已提交
90

D
dapan1121 已提交
91
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
D
dapan1121 已提交
92 93 94 95 96

  SUseDbOutput DbOut = {0};
  SBuildUseDBInput input = {0};
  tstrncpy(input.db, dbFName, tListLen(input.db));

D
dapan1121 已提交
97
  if (NULL != dbCache) {
D
dapan1121 已提交
98
    input.dbId = dbCache->dbId;
D
dapan1121 已提交
99

D
dapan1121 已提交
100 101
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
102
  }
D
dapan1121 已提交
103 104 105
  
  input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
  input.numOfTable = 0;
D
dapan1121 已提交
106

D
dapan1121 已提交
107
  code = ctgGetDBVgInfoFromMnode(pCtg, pTrans, pMgmtEps, &input, &DbOut, NULL);
D
dapan1121 已提交
108
  if (code) {
D
dapan1121 已提交
109
    if (CTG_DB_NOT_EXIST(code) && (NULL != dbCache)) {
D
dapan1121 已提交
110
      ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
D
dapan1121 已提交
111
      ctgPutRmDBToQueue(pCtg, input.db, input.dbId);
D
dapan1121 已提交
112 113 114 115 116
    }

    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
117
  CTG_ERR_RET(ctgPutUpdateVgToQueue(pCtg, dbFName, DbOut.dbId, DbOut.dbVgroup, true));
D
dapan 已提交
118

D
dapan1121 已提交
119 120 121
  return TSDB_CODE_SUCCESS;
}

D
dapan 已提交
122 123


D
dapan1121 已提交
124
int32_t ctgRefreshTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMetaOutput **pOutput, bool syncReq) {
D
dapan1121 已提交
125 126 127
  SVgroupInfo vgroupInfo = {0};
  int32_t code = 0;

D
dapan1121 已提交
128 129
  if (!CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    CTG_ERR_RET(catalogGetTableHashVgroup(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo));
D
dapan1121 已提交
130
  }
D
dapan1121 已提交
131

D
dapan1121 已提交
132
  STableMetaOutput  moutput = {0};
wafwerar's avatar
wafwerar 已提交
133
  STableMetaOutput *output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
D
dapan1121 已提交
134 135 136 137
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
  }
D
dapan1121 已提交
138

D
dapan1121 已提交
139 140
  if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
    ctgDebug("will refresh tbmeta, supposed in information_schema, tbName:%s", tNameGetTableName(ctx->pName));
D
dapan1121 已提交
141

D
dapan1121 已提交
142 143 144
    CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), (char *)ctx->pName->dbname, (char *)ctx->pName->tname, output, NULL));
  } else if (CTG_FLAG_IS_STB(ctx->flag)) {
    ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(ctx->pName));
D
dapan1121 已提交
145 146

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
147
    CTG_ERR_JRET(ctgGetTbMetaFromMnode(CTG_PARAMS_LIST(), ctx->pName, output, NULL));
D
dapan1121 已提交
148

D
dapan1121 已提交
149
    if (CTG_IS_META_NULL(output->metaType)) {
D
dapan1121 已提交
150
      CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo, output, NULL));
D
dapan1121 已提交
151 152
    }
  } else {
D
dapan1121 已提交
153
    ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
D
dapan1121 已提交
154 155

    // if get from vnode failed or no table meta, will not try mnode
D
dapan1121 已提交
156
    CTG_ERR_JRET(ctgGetTbMetaFromVnode(CTG_PARAMS_LIST(), ctx->pName, &vgroupInfo, output, NULL));
D
dapan1121 已提交
157

D
dapan1121 已提交
158
    if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
D
dapan1121 已提交
159
      ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(ctx->pName));
D
dapan1121 已提交
160

wafwerar's avatar
wafwerar 已提交
161
      taosMemoryFreeClear(output->tbMeta);
D
dapan1121 已提交
162
      
D
dapan1121 已提交
163
      CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), output->dbFName, output->tbName, output, NULL));
D
dapan1121 已提交
164
    } else if (CTG_IS_META_BOTH(output->metaType)) {
D
dapan1121 已提交
165
      int32_t exist = 0;
D
dapan1121 已提交
166 167
      if (!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) {
        CTG_ERR_JRET(ctgTbMetaExistInCache(pCtg, output->dbFName, output->tbName, &exist));
D
dapan1121 已提交
168
      }
H
Haojun Liao 已提交
169

D
dapan1121 已提交
170
      if (0 == exist) {
D
dapan1121 已提交
171
        CTG_ERR_JRET(ctgGetTbMetaFromMnodeImpl(CTG_PARAMS_LIST(), output->dbFName, output->tbName, &moutput, NULL));
D
dapan1121 已提交
172

D
dapan1121 已提交
173
        if (CTG_IS_META_NULL(moutput.metaType)) {
D
dapan1121 已提交
174
          SET_META_TYPE_NULL(output->metaType);
D
dapan1121 已提交
175 176
        }
        
wafwerar's avatar
wafwerar 已提交
177
        taosMemoryFreeClear(output->tbMeta);
D
dapan1121 已提交
178
        output->tbMeta = moutput.tbMeta;
D
dapan1121 已提交
179 180
        moutput.tbMeta = NULL;
      } else {
wafwerar's avatar
wafwerar 已提交
181
        taosMemoryFreeClear(output->tbMeta);
D
dapan1121 已提交
182
        
D
dapan1121 已提交
183
        SET_META_TYPE_CTABLE(output->metaType); 
D
dapan1121 已提交
184
      }
D
dapan1121 已提交
185 186 187
    }
  }

D
dapan1121 已提交
188
  if (CTG_IS_META_NULL(output->metaType)) {
D
dapan1121 已提交
189 190
    ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName));
    ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false);
D
dapan1121 已提交
191 192 193
    CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
  }

D
dapan 已提交
194 195 196 197 198 199
  if (CTG_IS_META_TABLE(output->metaType)) {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
  } else {
    ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
  }

D
dapan1121 已提交
200 201
  if (pOutput) {
    CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
D
dapan1121 已提交
202
  }
D
dapan1121 已提交
203

D
dapan1121 已提交
204 205 206 207
  CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, output, syncReq));

  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
208 209
_return:

D
dapan1121 已提交
210 211
  taosMemoryFreeClear(output->tbMeta);
  taosMemoryFreeClear(output);
D
dapan1121 已提交
212 213 214 215
  
  CTG_RET(code);
}

D
dapan1121 已提交
216 217 218
int32_t ctgGetTbMetaFromCache(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
  if (CTG_IS_SYS_DBNAME(ctx->pName->dbname)) {
    CTG_FLAG_SET_SYS_DB(ctx->flag);
D
dapan1121 已提交
219 220
  }

D
dapan1121 已提交
221 222 223 224 225 226 227 228
  CTG_ERR_RET(ctgReadTbMetaFromCache(pCtg, ctx, pTableMeta));

  if (*pTableMeta) {
    if (CTG_FLAG_MATCH_STB(ctx->flag, (*pTableMeta)->tableType) && ((!CTG_FLAG_IS_FORCE_UPDATE(ctx->flag)) || (CTG_FLAG_IS_SYS_DB(ctx->flag)))) {
      return TSDB_CODE_SUCCESS;
    }

    taosMemoryFreeClear(*pTableMeta);
D
dapan1121 已提交
229
  }
H
Haojun Liao 已提交
230

D
dapan1121 已提交
231 232
  if (CTG_FLAG_IS_UNKNOWN_STB(ctx->flag)) {
    CTG_FLAG_SET_STB(ctx->flag, ctx->tbInfo.tbType);
H
Haojun Liao 已提交
233
  }
D
dapan1121 已提交
234 235 236
  
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
237

H
Haojun Liao 已提交
238

D
dapan1121 已提交
239 240 241
int32_t ctgGetTbMeta(CTG_PARAMS, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta) {
  int32_t code = 0;
  STableMetaOutput *output = NULL;
D
dapan1121 已提交
242

D
dapan1121 已提交
243
  CTG_ERR_RET(ctgGetTbMetaFromCache(CTG_PARAMS_LIST(), ctx, pTableMeta));
D
dapan1121 已提交
244 245 246
  if (*pTableMeta) {
    goto _return;
  }
H
Haojun Liao 已提交
247

D
dapan1121 已提交
248 249
  while (true) {
    CTG_ERR_JRET(ctgRefreshTbMeta(CTG_PARAMS_LIST(), ctx, &output, false));
D
dapan1121 已提交
250

D
dapan1121 已提交
251 252 253
    if (CTG_IS_META_TABLE(output->metaType)) {
      *pTableMeta = output->tbMeta;
      goto _return;
D
dapan 已提交
254
    }
D
dapan1121 已提交
255 256 257 258 259 260 261 262 263 264 265 266

    if (CTG_IS_META_BOTH(output->metaType)) {
      memcpy(output->tbMeta, &output->ctbMeta, sizeof(output->ctbMeta));
      
      *pTableMeta = output->tbMeta;
      goto _return;
    }

    if ((!CTG_IS_META_CTABLE(output->metaType)) || output->tbMeta) {
      ctgError("invalid metaType:%d", output->metaType);
      taosMemoryFreeClear(output->tbMeta);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan 已提交
267 268
    }

D
dapan1121 已提交
269
    // HANDLE ONLY CHILD TABLE META
D
dapan1121 已提交
270

D
dapan1121 已提交
271
    taosMemoryFreeClear(output->tbMeta);
D
dapan 已提交
272

D
dapan1121 已提交
273 274 275 276 277 278 279 280 281 282 283
    SName stbName = *ctx->pName;
    strcpy(stbName.tname, output->tbName);
    SCtgTbMetaCtx stbCtx = {0};
    stbCtx.flag = ctx->flag;
    stbCtx.pName = &stbName;
    
    CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, pTableMeta));
    if (NULL == *pTableMeta) {
      ctgDebug("stb no longer exist, dbFName:%s, tbName:%s", output->dbFName, ctx->pName->tname);
      continue;
    }
D
dapan 已提交
284

D
dapan1121 已提交
285
    memcpy(*pTableMeta, &output->ctbMeta, sizeof(output->ctbMeta));
D
dapan 已提交
286

D
dapan1121 已提交
287 288
    break;
  }
D
dapan 已提交
289

D
dapan1121 已提交
290
_return:
D
dapan 已提交
291

D
dapan1121 已提交
292 293 294 295 296 297 298
  if (CTG_TABLE_NOT_EXIST(code) && ctx->tbInfo.inCache) {
    char dbFName[TSDB_DB_FNAME_LEN] = {0};
    if (CTG_FLAG_IS_SYS_DB(ctx->flag)) {
      strcpy(dbFName, ctx->pName->dbname);
    } else {
      tNameGetFullDbName(ctx->pName, dbFName);
    }
D
dapan 已提交
299

D
dapan1121 已提交
300 301 302 303 304 305
    if (TSDB_SUPER_TABLE == ctx->tbInfo.tbType) {
      ctgPutRmStbToQueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, ctx->tbInfo.suid, false);
    } else {
      ctgPutRmTbToQueue(pCtg, dbFName, ctx->tbInfo.dbId, ctx->pName->tname, false);
    }
  }
D
dapan 已提交
306

D
dapan1121 已提交
307
  taosMemoryFreeClear(output);
D
dapan 已提交
308

D
dapan1121 已提交
309 310 311 312
  if (*pTableMeta) {
    ctgDebug("tbmeta returned, tbName:%s, tbType:%d", ctx->pName->tname, (*pTableMeta)->tableType);
    ctgdShowTableMeta(pCtg, ctx->pName->tname, *pTableMeta);
  }
D
dapan 已提交
313 314 315 316

  CTG_RET(code);
}

D
dapan1121 已提交
317

D
dapan1121 已提交
318 319 320
int32_t ctgChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) {
  bool inCache = false;
  int32_t code = 0;
D
dapan1121 已提交
321
  
D
dapan1121 已提交
322 323 324
  *pass = false;
  
  CTG_ERR_RET(ctgChkAuthFromCache(pCtg, user, dbFName, type, &inCache, pass));
D
dapan1121 已提交
325

D
dapan1121 已提交
326 327 328
  if (inCache) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
329

D
dapan1121 已提交
330 331 332 333 334 335
  SGetUserAuthRsp authRsp = {0};
  CTG_ERR_RET(ctgGetUserDbAuthFromMnode(CTG_PARAMS_LIST(), user, &authRsp, NULL));
  
  if (authRsp.superAuth) {
    *pass = true;
    goto _return;
D
dapan1121 已提交
336 337
  }

D
dapan1121 已提交
338 339 340 341
  if (authRsp.createdDbs && taosHashGet(authRsp.createdDbs, dbFName, strlen(dbFName))) {
    *pass = true;
    goto _return;
  }
D
dapan1121 已提交
342

D
dapan1121 已提交
343 344 345 346 347
  if (type == AUTH_TYPE_READ && authRsp.readDbs && taosHashGet(authRsp.readDbs, dbFName, strlen(dbFName))) {
    *pass = true;
  } else if (type == AUTH_TYPE_WRITE && authRsp.writeDbs && taosHashGet(authRsp.writeDbs, dbFName, strlen(dbFName))) {
    *pass = true;
  }
D
dapan1121 已提交
348

D
dapan1121 已提交
349
_return:
D
dapan1121 已提交
350

D
dapan1121 已提交
351
  ctgPutUpdateUserToQueue(pCtg, &authRsp, false);
D
dapan1121 已提交
352

D
dapan1121 已提交
353 354 355
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
356
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SName* pTableName, SArray** pVgList) {
D
dapan1121 已提交
357 358 359 360 361 362
  STableMeta *tbMeta = NULL;
  int32_t code = 0;
  SVgroupInfo vgroupInfo = {0};
  SCtgDBCache* dbCache = NULL;
  SArray *vgList = NULL;
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
363 364 365
  SCtgTbMetaCtx ctx = {0};
  ctx.pName = pTableName;
  ctx.flag = CTG_FLAG_UNKNOWN_STB;
D
dapan1121 已提交
366 367 368

  *pVgList = NULL;
  
D
dapan1121 已提交
369
  CTG_ERR_JRET(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, &tbMeta));
D
dapan1121 已提交
370 371 372 373 374

  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);

  SHashObj *vgHash = NULL;  
D
dapan1121 已提交
375
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo));
D
dapan1121 已提交
376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419

  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
  }

  if (tbMeta->tableType == TSDB_SUPER_TABLE) {
    CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
  } else {
    // USE HASH METHOD INSTEAD OF VGID IN TBMETA
    ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType);
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
    
#if 0  
    int32_t vgId = tbMeta->vgId;
    if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) {
      ctgWarn("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
      CTG_ERR_JRET(TSDB_CODE_CTG_VG_META_MISMATCH);
    }

    vgList = taosArrayInit(1, sizeof(SVgroupInfo));
    if (NULL == vgList) {
      ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);    
    }

    if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
      ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    *pVgList = vgList;
    vgList = NULL;
#endif    
  }

_return:

  if (dbCache) {
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

wafwerar's avatar
wafwerar 已提交
420
  taosMemoryFreeClear(tbMeta);
D
dapan1121 已提交
421 422 423

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
wafwerar's avatar
wafwerar 已提交
424
    taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
425 426 427 428 429 430 431 432 433 434
  }

  if (vgList) {
    taosArrayDestroy(vgList);
    vgList = NULL;
  }

  CTG_RET(code);
}

D
dapan1121 已提交
435
int32_t catalogInit(SCatalogCfg *cfg) {
D
dapan 已提交
436
  if (gCtgMgmt.pCluster) {
D
dapan 已提交
437
    qError("catalog already initialized");
D
dapan1121 已提交
438
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
439 440
  }

wafwerar's avatar
wafwerar 已提交
441
  atomic_store_8((int8_t*)&gCtgMgmt.exit, false);
D
dapan1121 已提交
442

D
dapan1121 已提交
443
  if (cfg) {
D
dapan 已提交
444
    memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
H
Haojun Liao 已提交
445

D
dapan 已提交
446 447
    if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
      gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
D
dapan1121 已提交
448 449
    }

D
dapan 已提交
450 451
    if (gCtgMgmt.cfg.maxTblCacheNum == 0) {
      gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
D
dapan1121 已提交
452
    }
D
dapan1121 已提交
453

D
dapan 已提交
454 455
    if (gCtgMgmt.cfg.dbRentSec == 0) {
      gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
456 457
    }

D
dapan 已提交
458 459
    if (gCtgMgmt.cfg.stbRentSec == 0) {
      gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan1121 已提交
460
    }
D
dapan1121 已提交
461
  } else {
D
dapan 已提交
462 463 464 465
    gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
    gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
    gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
    gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
D
dapan 已提交
466 467
  }

D
dapan 已提交
468 469
  gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
  if (NULL == gCtgMgmt.pCluster) {
D
dapan1121 已提交
470 471
    qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
D
dapan1121 已提交
472 473
  }

D
dapan1121 已提交
474 475 476 477 478 479 480 481 482
  if (tsem_init(&gCtgMgmt.queue.reqSem, 0, 0)) {
    qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
  }
  
  if (tsem_init(&gCtgMgmt.queue.rspSem, 0, 0)) {
    qError("tsem_init failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
    CTG_ERR_RET(TSDB_CODE_CTG_SYS_ERROR);
  }
D
dapan1121 已提交
483

wafwerar's avatar
wafwerar 已提交
484
  gCtgMgmt.queue.head = taosMemoryCalloc(1, sizeof(SCtgQNode));
D
dapan1121 已提交
485
  if (NULL == gCtgMgmt.queue.head) {
D
dapan1121 已提交
486 487 488
    qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
    CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
489
  gCtgMgmt.queue.tail = gCtgMgmt.queue.head;
D
dapan1121 已提交
490

D
dapan1121 已提交
491 492 493 494 495 496
  gCtgMgmt.jobPool = taosOpenRef(200, ctgFreeJob);
  if (gCtgMgmt.jobPool < 0) {
    qError("taosOpenRef failed, error:%s", tstrerror(terrno));
    CTG_ERR_RET(terrno);
  }

D
dapan1121 已提交
497 498
  CTG_ERR_RET(ctgStartUpdateThread());

D
dapan 已提交
499
  qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
D
dapan1121 已提交
500

D
dapan 已提交
501
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
502 503
}

D
dapan1121 已提交
504
int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
505
  if (NULL == catalogHandle) {
D
dapan1121 已提交
506
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
507 508
  }

D
dapan 已提交
509
  if (NULL == gCtgMgmt.pCluster) {
D
dapan 已提交
510
    qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
D
dapan1121 已提交
511
    CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
D
dapan 已提交
512 513
  }

D
dapan1121 已提交
514 515
  int32_t code = 0;
  SCatalog *clusterCtg = NULL;
D
dapan 已提交
516

D
dapan1121 已提交
517
  while (true) {
D
dapan 已提交
518
    SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
D
dapan 已提交
519

D
dapan1121 已提交
520 521 522 523 524
    if (ctg && (*ctg)) {
      *catalogHandle = *ctg;
      qDebug("got catalog handle from cache, clusterId:%"PRIx64", CTG:%p", clusterId, *ctg);
      return TSDB_CODE_SUCCESS;
    }
D
dapan 已提交
525

wafwerar's avatar
wafwerar 已提交
526
    clusterCtg = taosMemoryCalloc(1, sizeof(SCatalog));
D
dapan1121 已提交
527 528 529 530 531
    if (NULL == clusterCtg) {
      qError("calloc %d failed", (int32_t)sizeof(SCatalog));
      CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan1121 已提交
532 533
    clusterCtg->clusterId = clusterId;

D
dapan 已提交
534 535
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
    CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
D
dapan1121 已提交
536

D
dapan 已提交
537
    clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
D
dapan1121 已提交
538 539 540 541 542
    if (NULL == clusterCtg->dbCache) {
      qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
    }

D
dapan 已提交
543
    code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
D
dapan1121 已提交
544 545 546 547 548 549 550 551 552 553 554 555 556
    if (code) {
      if (HASH_NODE_EXIST(code)) {
        ctgFreeHandle(clusterCtg);
        continue;
      }
      
      qError("taosHashPut CTG to cache failed, clusterId:%"PRIx64, clusterId);
      CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    qDebug("add CTG to cache, clusterId:%"PRIx64", CTG:%p", clusterId, clusterCtg);

    break;
D
dapan 已提交
557
  }
D
dapan1121 已提交
558 559

  *catalogHandle = clusterCtg;
D
dapan1121 已提交
560 561

  CTG_CACHE_STAT_ADD(clusterNum, 1);
D
dapan 已提交
562
  
D
dapan1121 已提交
563
  return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
564 565 566 567 568 569 570 571

_return:

  ctgFreeHandle(clusterCtg);
  
  CTG_RET(code);
}

D
dapan1121 已提交
572 573
void catalogFreeHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
D
dapan1121 已提交
574 575
    return;
  }
D
dapan1121 已提交
576

D
dapan 已提交
577
  if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
D
dapan1121 已提交
578
    ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
D
dapan1121 已提交
579 580 581
    return;
  }

D
dapan1121 已提交
582 583
  CTG_CACHE_STAT_SUB(clusterNum, 1);

D
dapan1121 已提交
584
  uint64_t clusterId = pCtg->clusterId;
D
dapan1121 已提交
585
  
D
dapan1121 已提交
586
  ctgFreeHandle(pCtg);
D
dapan1121 已提交
587
  
D
dapan1121 已提交
588
  ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
D
dapan 已提交
589 590
}

D
dapan1121 已提交
591
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId, int32_t *tableNum) {
D
dapan1121 已提交
592 593
  CTG_API_ENTER();

D
dapan1121 已提交
594
  if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
D
dapan1121 已提交
595 596 597 598
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  SCtgDBCache *dbCache = NULL;
D
dapan1121 已提交
599
  int32_t code = 0;
D
dapan1121 已提交
600

D
dapan1121 已提交
601 602
  CTG_ERR_JRET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
  if (NULL == dbCache) {
D
dapan1121 已提交
603
    *version = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
604
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
605 606
  }

D
dapan1121 已提交
607
  *version = dbCache->vgInfo->vgVersion;
D
dapan1121 已提交
608
  *dbId = dbCache->dbId;
D
dapan1121 已提交
609
  *tableNum = dbCache->vgInfo->numOfTable;
D
dapan1121 已提交
610 611 612

  ctgReleaseVgInfo(dbCache);
  ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
613

D
dapan1121 已提交
614
  ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
D
dapan1121 已提交
615

D
dapan1121 已提交
616
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
617 618 619 620

_return:

  CTG_API_LEAVE(code);
D
dapan1121 已提交
621 622
}

D
dapan1121 已提交
623
int32_t catalogGetDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SArray** vgroupList) {
D
dapan1121 已提交
624 625
  CTG_API_ENTER();

D
dapan1121 已提交
626
  if (NULL == pCtg || NULL == dbFName || NULL == pTrans || NULL == pMgmtEps || NULL == vgroupList) {
D
dapan1121 已提交
627 628
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
629

D
dapan1121 已提交
630
  SCtgDBCache* dbCache = NULL;
631
  int32_t code = 0;
D
dapan1121 已提交
632
  SArray *vgList = NULL;
D
dapan1121 已提交
633 634
  SHashObj *vgHash = NULL;
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
635
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName, &dbCache, &vgInfo));
D
dapan1121 已提交
636 637 638 639
  if (dbCache) {
    vgHash = dbCache->vgInfo->vgHash;
  } else {
    vgHash = vgInfo->vgHash;
D
dapan1121 已提交
640 641
  }

D
dapan1121 已提交
642
  CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
D
dapan1121 已提交
643 644 645 646 647

  *vgroupList = vgList;
  vgList = NULL;

_return:
D
dapan1121 已提交
648 649

  if (dbCache) {
D
dapan1121 已提交
650 651
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
D
dapan1121 已提交
652 653
  }

D
dapan1121 已提交
654 655
  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
wafwerar's avatar
wafwerar 已提交
656
    taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
657 658
  }

D
dapan1121 已提交
659
  CTG_API_LEAVE(code);  
D
dapan1121 已提交
660 661 662
}


D
dapan1121 已提交
663
int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SDBVgInfo* dbInfo) {
D
dapan1121 已提交
664
  CTG_API_ENTER();
D
dapan1121 已提交
665 666

  int32_t code = 0;
D
dapan1121 已提交
667
  
D
dapan1121 已提交
668
  if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
D
dapan1121 已提交
669
    ctgFreeVgInfo(dbInfo);
D
dapan1121 已提交
670 671 672
    CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
673
  code = ctgPutUpdateVgToQueue(pCtg, dbFName, dbId, dbInfo, false);
D
dapan1121 已提交
674

D
dapan1121 已提交
675 676
_return:

D
dapan1121 已提交
677
  CTG_API_LEAVE(code);
D
dapan1121 已提交
678 679 680
}


D
dapan1121 已提交
681 682 683
int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
  CTG_API_ENTER();

D
dapan1121 已提交
684 685
  int32_t code = 0;
  
D
dapan1121 已提交
686 687
  if (NULL == pCtg || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
688 689
  }

D
dapan1121 已提交
690
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
691
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
692
  }
D
dapan1121 已提交
693

D
dapan1121 已提交
694
  CTG_ERR_JRET(ctgPutRmDBToQueue(pCtg, dbFName, dbId));
D
dapan 已提交
695

D
dapan1121 已提交
696
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
697
  
D
dapan1121 已提交
698 699
_return:

D
dapan1121 已提交
700
  CTG_API_LEAVE(code);
D
dapan1121 已提交
701 702
}

D
dapan1121 已提交
703
int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, SEpSet *epSet) {
704
  return 0;
D
dapan1121 已提交
705
}
D
dapan1121 已提交
706

D
dapan1121 已提交
707
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
D
dapan 已提交
708 709 710 711 712 713 714 715 716 717 718 719
  CTG_API_ENTER();

  int32_t code = 0;
  
  if (NULL == pCtg || NULL == pTableName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  if (NULL == pCtg->dbCache) {
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
  }

D
dapan1121 已提交
720
  CTG_ERR_JRET(ctgRemoveTbMetaFromCache(pCtg, pTableName, true));
D
dapan 已提交
721 722

_return:
D
dapan1121 已提交
723
  
D
dapan 已提交
724 725 726 727
  CTG_API_LEAVE(code);
}


D
dapan1121 已提交
728 729 730
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid) {
  CTG_API_ENTER();

D
dapan 已提交
731 732
  int32_t code = 0;
  
D
dapan1121 已提交
733 734
  if (NULL == pCtg || NULL == dbFName || NULL == stbName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan 已提交
735 736
  }

D
dapan1121 已提交
737
  if (NULL == pCtg->dbCache) {
D
dapan1121 已提交
738
    CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
739
  }
D
dapan1121 已提交
740

D
dapan1121 已提交
741
  CTG_ERR_JRET(ctgPutRmStbToQueue(pCtg, dbFName, dbId, stbName, suid, true));
D
dapan 已提交
742

D
dapan1121 已提交
743
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
744
  
D
dapan1121 已提交
745 746
_return:

D
dapan1121 已提交
747
  CTG_API_LEAVE(code);
D
dapan 已提交
748 749
}

D
dapan1121 已提交
750
int32_t catalogGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
751 752
  CTG_API_ENTER();

D
dapan1121 已提交
753 754 755 756 757
  SCtgTbMetaCtx ctx = {0};
  ctx.pName = (SName*)pTableName;
  ctx.flag = CTG_FLAG_UNKNOWN_STB;
  
  CTG_API_LEAVE(ctgGetTbMeta(pCtg, pTrans, pMgmtEps, &ctx, pTableMeta));
D
dapan1121 已提交
758
}
D
dapan1121 已提交
759

D
dapan1121 已提交
760
int32_t catalogGetSTableMeta(SCatalog* pCtg, void * pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
D
dapan1121 已提交
761 762
  CTG_API_ENTER();

D
dapan1121 已提交
763 764 765 766 767
  SCtgTbMetaCtx ctx = {0};
  ctx.pName = (SName*)pTableName;
  ctx.flag = CTG_FLAG_STB;

  CTG_API_LEAVE(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, pTableMeta));
D
dapan1121 已提交
768 769
}

D
dapan1121 已提交
770 771 772 773 774 775 776
int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == rspMsg) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

wafwerar's avatar
wafwerar 已提交
777
  STableMetaOutput *output = taosMemoryCalloc(1, sizeof(STableMetaOutput));
D
dapan1121 已提交
778 779 780 781 782
  if (NULL == output) {
    ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
    CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
  }
  
D
dapan1121 已提交
783 784
  int32_t code = 0;

D
dapan1121 已提交
785 786
  strcpy(output->dbFName, rspMsg->dbFName);
  strcpy(output->tbName, rspMsg->tbName);
D
dapan1121 已提交
787

D
dapan1121 已提交
788
  output->dbId = rspMsg->dbId;
D
dapan1121 已提交
789
  
D
dapan1121 已提交
790
  SET_META_TYPE_TABLE(output->metaType);
D
dapan1121 已提交
791
  
D
dapan1121 已提交
792
  CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
D
dapan1121 已提交
793

D
dapan1121 已提交
794
  CTG_ERR_JRET(ctgPutUpdateTbToQueue(pCtg, output, false));
D
dapan 已提交
795

D
dapan1121 已提交
796 797
  CTG_API_LEAVE(code);
  
D
dapan1121 已提交
798 799
_return:

wafwerar's avatar
wafwerar 已提交
800 801
  taosMemoryFreeClear(output->tbMeta);
  taosMemoryFreeClear(output);
D
dapan1121 已提交
802
  
D
dapan1121 已提交
803
  CTG_API_LEAVE(code);
D
dapan1121 已提交
804 805
}

D
dapan1121 已提交
806
int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pTables) {
D
dapan1121 已提交
807 808 809 810 811
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTables) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
812

D
dapan1121 已提交
813 814
  SName name;
  int32_t sver = 0;
D
dapan1121 已提交
815
  int32_t tver = 0;
D
dapan1121 已提交
816 817 818
  int32_t tbNum = taosArrayGetSize(pTables);
  for (int32_t i = 0; i < tbNum; ++i) {
    STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
D
dapan1121 已提交
819 820 821 822
    if (NULL == pTb->tbFName || 0 == pTb->tbFName[0]) {
      continue;
    }
    
D
dapan1121 已提交
823
    tNameFromString(&name, pTb->tbFName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
D
dapan1121 已提交
824

D
dapan1121 已提交
825 826 827 828
    if (CTG_IS_SYS_DBNAME(name.dbname)) {
      continue;
    }

L
Liu Jicong 已提交
829
    int32_t  tbType = 0;
D
dapan1121 已提交
830
    uint64_t suid = 0;
L
Liu Jicong 已提交
831
    char     stbName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
832 833
    ctgReadTbVerFromCache(pCtg, &name, &sver, &tver, &tbType, &suid, stbName);
    if ((sver >= 0 && sver < pTb->sver) || (tver >= 0 && tver < pTb->tver)) {
D
dapan1121 已提交
834 835 836 837
      switch (tbType) {
        case TSDB_CHILD_TABLE: {
          SName stb = name;
          strcpy(stb.tname, stbName);
L
Liu Jicong 已提交
838
          catalogRemoveTableMeta(pCtg, &stb);
D
dapan1121 已提交
839 840 841 842 843 844 845 846 847 848
          break;
        }
        case TSDB_SUPER_TABLE:
        case TSDB_NORMAL_TABLE:
          catalogRemoveTableMeta(pCtg, &name);
          break;
        default:
          ctgError("ignore table type %d", tbType);
          break;
      }
D
dapan1121 已提交
849 850 851 852
    }
  }

  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
853 854 855
}


D
dapan1121 已提交
856 857 858 859 860 861 862 863 864
int32_t catalogRefreshDBVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  CTG_API_LEAVE(ctgRefreshDBVgInfo(pCtg, pTrans, pMgmtEps, dbFName));
}
D
dapan1121 已提交
865

D
dapan1121 已提交
866
int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
D
dapan1121 已提交
867 868
  CTG_API_ENTER();

D
dapan1121 已提交
869
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
D
dapan1121 已提交
870 871 872
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
873 874 875 876 877
  SCtgTbMetaCtx ctx = {0};
  ctx.pName = (SName*)pTableName;
  ctx.flag = CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable);

  CTG_API_LEAVE(ctgRefreshTbMeta(CTG_PARAMS_LIST(), &ctx, NULL, true));
878
}
879

D
dapan1121 已提交
880
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
D
dapan1121 已提交
881 882
  CTG_API_ENTER();

D
dapan1121 已提交
883 884 885 886 887
  SCtgTbMetaCtx ctx = {0};
  ctx.pName = (SName*)pTableName;
  ctx.flag = CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable);

  CTG_API_LEAVE(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, pTableMeta));
D
dapan1121 已提交
888 889
}

D
dapan1121 已提交
890
int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
D
dapan1121 已提交
891
  CTG_API_ENTER();
D
dapan1121 已提交
892

D
dapan1121 已提交
893
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pVgList) {
D
dapan1121 已提交
894 895
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
896

D
dapan1121 已提交
897
  if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
898 899 900
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
901

D
dapan1121 已提交
902
  CTG_API_LEAVE(ctgGetTbDistVgInfo(pCtg, pTrans, pMgmtEps, (SName*)pTableName, pVgList));
D
dapan1121 已提交
903 904 905
}


D
dapan1121 已提交
906
int32_t catalogGetTableHashVgroup(SCatalog *pCtg, void *pTrans, const SEpSet *pMgmtEps, const SName *pTableName, SVgroupInfo *pVgroup) {
D
dapan1121 已提交
907 908
  CTG_API_ENTER();

D
dapan1121 已提交
909
  if (CTG_IS_SYS_DBNAME(pTableName->dbname)) {
D
dapan1121 已提交
910 911 912 913
    ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname);
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
914 915
  SCtgDBCache* dbCache = NULL;
  int32_t code = 0;
H
Haojun Liao 已提交
916 917
  char db[TSDB_DB_FNAME_LEN] = {0};
  tNameGetFullDbName(pTableName, db);
D
dapan1121 已提交
918

D
dapan1121 已提交
919
  SDBVgInfo *vgInfo = NULL;
D
dapan1121 已提交
920
  CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pTrans, pMgmtEps, db, &dbCache, &vgInfo));
D
dapan1121 已提交
921

D
dapan1121 已提交
922
  CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
D
dapan1121 已提交
923

D
dapan1121 已提交
924
_return:
D
dapan1121 已提交
925

D
dapan1121 已提交
926
  if (dbCache) {
D
dapan1121 已提交
927 928 929 930 931 932
    ctgReleaseVgInfo(dbCache);
    ctgReleaseDBCache(pCtg, dbCache);
  }

  if (vgInfo) {
    taosHashCleanup(vgInfo->vgHash);
wafwerar's avatar
wafwerar 已提交
933
    taosMemoryFreeClear(vgInfo);
D
dapan1121 已提交
934
  }
D
dapan1121 已提交
935

D
dapan1121 已提交
936
  CTG_API_LEAVE(code);
D
dapan1121 已提交
937 938 939
}


D
dapan1121 已提交
940
int32_t catalogGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp) {
D
dapan1121 已提交
941 942
  CTG_API_ENTER();

D
dapan1121 已提交
943
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == pRsp) {
D
dapan1121 已提交
944 945 946
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
947
  int32_t code = 0;
D
dapan1121 已提交
948
  pRsp->pTableMeta = NULL;
D
dapan1121 已提交
949

D
dapan1121 已提交
950 951
  if (pReq->pTableMeta) {
    int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
D
dapan1121 已提交
952
    if (tbNum <= 0) {
D
dapan1121 已提交
953
      ctgError("empty table name list, tbNum:%d", tbNum);
D
dapan1121 已提交
954
      CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
955
    }
H
Haojun Liao 已提交
956

D
dapan1121 已提交
957 958
    pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES);
    if (NULL == pRsp->pTableMeta) {
D
dapan1121 已提交
959
      ctgError("taosArrayInit %d failed", tbNum);
D
dapan1121 已提交
960
      CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
D
dapan1121 已提交
961 962 963
    }
    
    for (int32_t i = 0; i < tbNum; ++i) {
D
dapan1121 已提交
964
      SName *name = taosArrayGet(pReq->pTableMeta, i);
D
dapan1121 已提交
965
      STableMeta *pTableMeta = NULL;
D
dapan1121 已提交
966 967 968
      SCtgTbMetaCtx ctx = {0};
      ctx.pName = name;
      ctx.flag = CTG_FLAG_UNKNOWN_STB;
D
dapan1121 已提交
969
      
D
dapan1121 已提交
970
      CTG_ERR_JRET(ctgGetTbMeta(CTG_PARAMS_LIST(), &ctx, &pTableMeta));
D
dapan1121 已提交
971 972 973

      if (NULL == taosArrayPush(pRsp->pTableMeta, &pTableMeta)) {
        ctgError("taosArrayPush failed, idx:%d", i);
wafwerar's avatar
wafwerar 已提交
974
        taosMemoryFreeClear(pTableMeta);
D
dapan1121 已提交
975 976 977 978 979
        CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
      }
    }
  }

D
dapan1121 已提交
980
  if (pReq->qNodeRequired) {
D
dapan 已提交
981
    pRsp->pQnodeList = taosArrayInit(10, sizeof(SQueryNodeAddr));
D
dapan1121 已提交
982
    CTG_ERR_JRET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), pRsp->pQnodeList, NULL));
D
dapan1121 已提交
983 984
  }

D
dapan1121 已提交
985
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan1121 已提交
986 987

_return:  
D
dapan1121 已提交
988

D
dapan1121 已提交
989 990 991 992
  if (pRsp->pTableMeta) {
    int32_t aSize = taosArrayGetSize(pRsp->pTableMeta);
    for (int32_t i = 0; i < aSize; ++i) {
      STableMeta *pMeta = taosArrayGetP(pRsp->pTableMeta, i);
wafwerar's avatar
wafwerar 已提交
993
      taosMemoryFreeClear(pMeta);
D
dapan1121 已提交
994 995 996
    }
    
    taosArrayDestroy(pRsp->pTableMeta);
D
dapan1121 已提交
997
    pRsp->pTableMeta = NULL;
D
dapan1121 已提交
998
  }
D
dapan 已提交
999
  
D
dapan1121 已提交
1000
  CTG_API_LEAVE(code);
1001
}
D
dapan 已提交
1002

D
dapan1121 已提交
1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031
int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int64_t* jobId) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pReq || NULL == fp || NULL == param) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  int32_t code = 0;
  SCtgJob *pJob = NULL;
  CTG_ERR_JRET(ctgInitJob(CTG_PARAMS_LIST(), &pJob, reqId, pReq, fp, param));

  CTG_ERR_JRET(ctgLaunchJob(pJob));

  *jobId = pJob->refId;
  
_return:

  if (pJob) {
    taosReleaseRef(gCtgMgmt.jobPool, pJob->refId);

    if (code) {
      taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);
    }
  }
  
  CTG_API_LEAVE(code);
}

int32_t catalogGetQnodeList(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, SArray* pQnodeList) {
D
dapan1121 已提交
1032
  CTG_API_ENTER();
D
dapan1121 已提交
1033 1034
  
  int32_t code = 0;
D
dapan1121 已提交
1035
  if (NULL == pCtg || NULL == pTrans  || NULL == pMgmtEps || NULL == pQnodeList) {
D
dapan1121 已提交
1036 1037 1038
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1039
  CTG_ERR_JRET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), pQnodeList, NULL));
D
dapan1121 已提交
1040 1041

_return:
D
dapan 已提交
1042

D
dapan1121 已提交
1043
  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
D
dapan 已提交
1044 1045
}

D
dapan1121 已提交
1046
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion **stables, uint32_t *num) {
D
dapan1121 已提交
1047 1048
  CTG_API_ENTER();

D
dapan1121 已提交
1049 1050
  if (NULL == pCtg || NULL == stables || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
D
dapan1121 已提交
1051 1052
  }

D
dapan1121 已提交
1053 1054 1055 1056
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion)));
}

int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num) {
D
dapan1121 已提交
1057
  CTG_API_ENTER();
D
dapan1121 已提交
1058 1059 1060 1061
  
  if (NULL == pCtg || NULL == dbs || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }
D
dapan1121 已提交
1062

D
dapan1121 已提交
1063
  CTG_API_LEAVE(ctgMetaRentGet(&pCtg->dbRent, (void **)dbs, num, sizeof(SDbVgVersion)));
D
dapan1121 已提交
1064 1065
}

D
dapan 已提交
1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084
int32_t catalogGetExpiredUsers(SCatalog* pCtg, SUserAuthVersion **users, uint32_t *num) {
  CTG_API_ENTER();
  
  if (NULL == pCtg || NULL == users || NULL == num) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  *num = taosHashGetSize(pCtg->userCache);
  if (*num > 0) {
    *users = taosMemoryCalloc(*num, sizeof(SUserAuthVersion));
    if (NULL == *users) {
      ctgError("calloc %d userAuthVersion failed", *num);
      CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

  uint32_t i = 0;
  SCtgUserAuth *pAuth = taosHashIterate(pCtg->userCache, NULL);
  while (pAuth != NULL) {
D
fix bug  
dapan1121 已提交
1085 1086 1087 1088
    size_t len = 0;
    void *key = taosHashGetKey(pAuth, &len);
    strncpy((*users)[i].user, key, len);
    (*users)[i].user[len] = 0;
D
dapan 已提交
1089
    (*users)[i].version = pAuth->version;
D
fix bug  
dapan1121 已提交
1090
    ++i;
D
dapan 已提交
1091 1092 1093 1094 1095 1096 1097
    pAuth = taosHashIterate(pCtg->userCache, pAuth);
  }

  CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}


D
dapan1121 已提交
1098
int32_t catalogGetDBCfg(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* dbFName, SDbCfgInfo* pDbCfg) {
D
dapan1121 已提交
1099 1100
  CTG_API_ENTER();
  
D
dapan1121 已提交
1101
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == dbFName || NULL == pDbCfg) {
D
dapan1121 已提交
1102 1103 1104
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1105
  CTG_API_LEAVE(ctgGetDBCfgFromMnode(CTG_PARAMS_LIST(), dbFName, pDbCfg, NULL));
D
dapan1121 已提交
1106
}
D
dapan 已提交
1107

D
dapan1121 已提交
1108
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo) {
D
dapan1121 已提交
1109 1110
  CTG_API_ENTER();
  
D
dapan1121 已提交
1111
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == indexName || NULL == pInfo) {
D
dapan1121 已提交
1112 1113 1114
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1115
  CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
D
dapan1121 已提交
1116 1117
}

D
dapan1121 已提交
1118
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo) {
D
dapan1121 已提交
1119 1120
  CTG_API_ENTER();
  
D
dapan1121 已提交
1121
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == funcName || NULL == pInfo) {
D
dapan1121 已提交
1122 1123 1124
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1125
  int32_t code = 0;
D
dapan1121 已提交
1126
  CTG_ERR_JRET(ctgGetUdfInfoFromMnode(CTG_PARAMS_LIST(), funcName, pInfo, NULL));
D
dapan1121 已提交
1127 1128 1129 1130
  
_return:
  
  CTG_API_LEAVE(code);
D
dapan1121 已提交
1131 1132
}

D
dapan1121 已提交
1133
int32_t catalogChkAuth(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass) {
D
dapan 已提交
1134 1135
  CTG_API_ENTER();
  
D
dapan1121 已提交
1136
  if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == user || NULL == dbFName || NULL == pass) {
D
dapan 已提交
1137 1138 1139 1140
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

  int32_t code = 0;
D
dapan1121 已提交
1141
  CTG_ERR_JRET(ctgChkAuth(CTG_PARAMS_LIST(), user, dbFName, type, pass));
D
dapan 已提交
1142 1143 1144 1145 1146 1147
  
_return:

  CTG_API_LEAVE(code);
}

D
dapan 已提交
1148 1149 1150 1151 1152 1153 1154
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth) {
  CTG_API_ENTER();

  if (NULL == pCtg || NULL == pAuth) {
    CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
1155
  CTG_API_LEAVE(ctgPutUpdateUserToQueue(pCtg, pAuth, false));
D
dapan 已提交
1156 1157
}

D
dapan1121 已提交
1158

D
dapan 已提交
1159
void catalogDestroy(void) {
D
dapan1121 已提交
1160 1161
  qInfo("start to destroy catalog");
  
wafwerar's avatar
wafwerar 已提交
1162
  if (NULL == gCtgMgmt.pCluster || atomic_load_8((int8_t*)&gCtgMgmt.exit)) {
D
dapan1121 已提交
1163 1164 1165
    return;
  }

wafwerar's avatar
wafwerar 已提交
1166
  atomic_store_8((int8_t*)&gCtgMgmt.exit, true);
D
dapan 已提交
1167

D
dapan1121 已提交
1168 1169 1170 1171 1172 1173 1174
  if (tsem_post(&gCtgMgmt.queue.reqSem)) {
    qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
  }
  
  if (tsem_post(&gCtgMgmt.queue.rspSem)) {
    qError("tsem_post failed, error:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
  }
D
dapan1121 已提交
1175

D
dapan1121 已提交
1176
  while (CTG_IS_LOCKED(&gCtgMgmt.lock)) {
wafwerar's avatar
wafwerar 已提交
1177
    taosUsleep(1);
D
dapan1121 已提交
1178 1179
  }
  
D
dapan 已提交
1180
  CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
1181

D
dapan1121 已提交
1182
  SCatalog *pCtg = NULL;
D
dapan 已提交
1183
  void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
D
dapan1121 已提交
1184
  while (pIter) {
D
dapan1121 已提交
1185
    pCtg = *(SCatalog **)pIter;
D
dapan1121 已提交
1186

D
dapan1121 已提交
1187 1188
    if (pCtg) {
      catalogFreeHandle(pCtg);
D
dapan1121 已提交
1189 1190
    }
    
D
dapan 已提交
1191
    pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
D
dapan 已提交
1192
  }
D
dapan1121 已提交
1193
  
D
dapan 已提交
1194 1195
  taosHashCleanup(gCtgMgmt.pCluster);
  gCtgMgmt.pCluster = NULL;
D
dapan1121 已提交
1196

wafwerar's avatar
wafwerar 已提交
1197
  if (CTG_IS_LOCKED(&gCtgMgmt.lock) == TD_RWLATCH_WRITE_FLAG_COPY) CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
D
dapan1121 已提交
1198

D
dapan1121 已提交
1199
  qInfo("catalog destroyed");
D
dapan 已提交
1200 1201 1202 1203
}