ctgUtil.c 36.1 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
H
Hongze Cheng 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
H
Hongze Cheng 已提交
19 20
#include "tname.h"
#include "trpc.h"
D
dapan1121 已提交
21

22 23 24 25 26 27 28
void ctgFreeMsgSendParam(void* param) {
  if (NULL == param) {
    return;
  }

  SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
  taosArrayDestroy(pParam->taskId);
D
dapan1121 已提交
29
  taosArrayDestroy(pParam->msgIdx);
30 31 32

  taosMemoryFree(param);
}
D
dapan1121 已提交
33

D
dapan1121 已提交
34 35 36 37 38 39 40 41
void ctgFreeBatchMsg(void* msg) {
  if (NULL == msg) {
    return;
  }
  SBatchMsg* pMsg = (SBatchMsg*)msg;
  taosMemoryFree(pMsg->msg);
}

H
Hongze Cheng 已提交
42
void ctgFreeBatch(SCtgBatch* pBatch) {
D
dapan1121 已提交
43 44 45
  if (NULL == pBatch) {
    return;
  }
H
Hongze Cheng 已提交
46

D
dapan1121 已提交
47
  taosArrayDestroyEx(pBatch->pMsgs, ctgFreeBatchMsg);
D
dapan1121 已提交
48 49 50
  taosArrayDestroy(pBatch->pTaskIds);
}

H
Hongze Cheng 已提交
51
void ctgFreeBatchs(SHashObj* pBatchs) {
D
dapan1121 已提交
52 53 54 55 56 57 58 59 60 61 62 63
  void* p = taosHashIterate(pBatchs, NULL);
  while (NULL != p) {
    SCtgBatch* pBatch = (SCtgBatch*)p;

    ctgFreeBatch(pBatch);

    p = taosHashIterate(pBatchs, p);
  }

  taosHashCleanup(pBatchs);
}

H
Hongze Cheng 已提交
64
char* ctgTaskTypeStr(CTG_TASK_TYPE type) {
D
dapan1121 已提交
65 66 67
  switch (type) {
    case CTG_TASK_GET_QNODE:
      return "[get qnode list]";
D
dapan1121 已提交
68 69
    case CTG_TASK_GET_DNODE:
      return "[get dnode list]";
D
dapan1121 已提交
70 71 72 73 74 75 76 77 78 79
    case CTG_TASK_GET_DB_VGROUP:
      return "[get db vgroup]";
    case CTG_TASK_GET_DB_CFG:
      return "[get db cfg]";
    case CTG_TASK_GET_DB_INFO:
      return "[get db info]";
    case CTG_TASK_GET_TB_META:
      return "[get table meta]";
    case CTG_TASK_GET_TB_HASH:
      return "[get table hash]";
D
dapan1121 已提交
80 81 82 83
    case CTG_TASK_GET_TB_INDEX:
      return "[get table index]";
    case CTG_TASK_GET_TB_CFG:
      return "[get table cfg]";
D
dapan1121 已提交
84 85 86 87 88 89
    case CTG_TASK_GET_INDEX:
      return "[get index]";
    case CTG_TASK_GET_UDF:
      return "[get udf]";
    case CTG_TASK_GET_USER:
      return "[get user]";
D
dapan1121 已提交
90 91
    case CTG_TASK_GET_SVR_VER:
      return "[get svr ver]";
D
dapan1121 已提交
92 93
    case CTG_TASK_GET_TB_META_BATCH:
      return "[bget table meta]";
94 95
    case CTG_TASK_GET_TB_HASH_BATCH:
      return "[bget table hash]";
D
dapan1121 已提交
96 97 98 99 100
    default:
      return "unknown";
  }
}

H
Hongze Cheng 已提交
101
void ctgFreeQNode(SCtgQNode* node) {
102 103 104 105 106 107 108 109
  if (NULL == node) {
    return;
  }

  if (node->op) {
    taosMemoryFree(node->op->data);
    taosMemoryFree(node->op);
  }
H
Hongze Cheng 已提交
110

111
  taosMemoryFree(node);
112 113
}

H
Hongze Cheng 已提交
114
void ctgFreeSTableIndex(void* info) {
D
dapan1121 已提交
115 116 117 118
  if (NULL == info) {
    return;
  }

H
Hongze Cheng 已提交
119
  STableIndex* pInfo = (STableIndex*)info;
D
dapan1121 已提交
120 121 122 123

  taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
}

D
dapan1121 已提交
124 125 126
void ctgFreeSMetaData(SMetaData* pData) {
  taosArrayDestroy(pData->pTableMeta);
  pData->pTableMeta = NULL;
D
dapan1121 已提交
127

H
Hongze Cheng 已提交
128 129 130 131 132 133
  /*
    for (int32_t i = 0; i < taosArrayGetSize(pData->pDbVgroup); ++i) {
      SArray** pArray = taosArrayGet(pData->pDbVgroup, i);
      taosArrayDestroy(*pArray);
    }
  */
D
dapan1121 已提交
134 135
  taosArrayDestroy(pData->pDbVgroup);
  pData->pDbVgroup = NULL;
H
Hongze Cheng 已提交
136

D
dapan1121 已提交
137 138
  taosArrayDestroy(pData->pTableHash);
  pData->pTableHash = NULL;
D
dapan1121 已提交
139 140 141

  taosArrayDestroy(pData->pTableIndex);
  pData->pTableIndex = NULL;
H
Hongze Cheng 已提交
142

D
dapan1121 已提交
143 144
  taosArrayDestroy(pData->pUdfList);
  pData->pUdfList = NULL;
D
dapan1121 已提交
145

H
Hongze Cheng 已提交
146 147 148 149 150 151
  /*
    for (int32_t i = 0; i < taosArrayGetSize(pData->pDbCfg); ++i) {
      SDbCfgInfo* pInfo = taosArrayGet(pData->pDbCfg, i);
      taosArrayDestroy(pInfo->pRetensions);
    }
  */
D
dapan1121 已提交
152 153
  taosArrayDestroy(pData->pDbCfg);
  pData->pDbCfg = NULL;
D
dapan1121 已提交
154 155 156

  taosArrayDestroy(pData->pDbInfo);
  pData->pDbInfo = NULL;
H
Hongze Cheng 已提交
157

D
dapan1121 已提交
158 159
  taosArrayDestroy(pData->pIndex);
  pData->pIndex = NULL;
H
Hongze Cheng 已提交
160

D
dapan1121 已提交
161 162
  taosArrayDestroy(pData->pUser);
  pData->pUser = NULL;
H
Hongze Cheng 已提交
163

D
dapan1121 已提交
164 165
  taosArrayDestroy(pData->pQnodeList);
  pData->pQnodeList = NULL;
D
dapan1121 已提交
166

D
dapan1121 已提交
167 168 169
  taosArrayDestroy(pData->pDnodeList);
  pData->pDnodeList = NULL;

D
dapan1121 已提交
170 171
  taosArrayDestroy(pData->pTableCfg);
  pData->pTableCfg = NULL;
D
dapan1121 已提交
172 173

  taosMemoryFreeClear(pData->pSvrVer);
D
dapan1121 已提交
174 175
}

H
Hongze Cheng 已提交
176
void ctgFreeSCtgUserAuth(SCtgUserAuth* userCache) {
D
dapan1121 已提交
177 178 179 180 181
  taosHashCleanup(userCache->createdDbs);
  taosHashCleanup(userCache->readDbs);
  taosHashCleanup(userCache->writeDbs);
}

H
Hongze Cheng 已提交
182
void ctgFreeMetaRent(SCtgRentMgmt* mgmt) {
D
dapan1121 已提交
183 184 185 186 187
  if (NULL == mgmt->slots) {
    return;
  }

  for (int32_t i = 0; i < mgmt->slotNum; ++i) {
H
Hongze Cheng 已提交
188
    SCtgRentSlot* slot = &mgmt->slots[i];
D
dapan1121 已提交
189 190 191 192 193 194 195 196 197
    if (slot->meta) {
      taosArrayDestroy(slot->meta);
      slot->meta = NULL;
    }
  }

  taosMemoryFreeClear(mgmt->slots);
}

H
Hongze Cheng 已提交
198
void ctgFreeStbMetaCache(SCtgDBCache* dbCache) {
D
dapan1121 已提交
199 200 201 202
  if (NULL == dbCache->stbCache) {
    return;
  }

H
Hongze Cheng 已提交
203
  int32_t stbNum = taosHashGetSize(dbCache->stbCache);
D
dapan1121 已提交
204 205
  taosHashCleanup(dbCache->stbCache);
  dbCache->stbCache = NULL;
D
dapan1121 已提交
206
  CTG_CACHE_STAT_DEC(numOfStb, stbNum);
D
dapan1121 已提交
207 208
}

H
Hongze Cheng 已提交
209
void ctgFreeTbCacheImpl(SCtgTbCache* pCache) {
D
dapan1121 已提交
210
  qDebug("tbMeta freed, p:%p", pCache->pMeta);
D
dapan1121 已提交
211 212 213 214 215 216
  taosMemoryFreeClear(pCache->pMeta);
  if (pCache->pIndex) {
    taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
    taosMemoryFreeClear(pCache->pIndex);
  }
}
D
dapan1121 已提交
217

H
Hongze Cheng 已提交
218
void ctgFreeTbCache(SCtgDBCache* dbCache) {
D
dapan1121 已提交
219 220
  if (NULL == dbCache->tbCache) {
    return;
D
dapan1121 已提交
221 222
  }

H
Hongze Cheng 已提交
223 224
  int32_t      tblNum = taosHashGetSize(dbCache->tbCache);
  SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL);
D
dapan1121 已提交
225 226 227
  while (NULL != pCache) {
    ctgFreeTbCacheImpl(pCache);
    pCache = taosHashIterate(dbCache->tbCache, pCache);
D
dapan1121 已提交
228
  }
D
dapan1121 已提交
229 230
  taosHashCleanup(dbCache->tbCache);
  dbCache->tbCache = NULL;
D
dapan1121 已提交
231
  CTG_CACHE_STAT_DEC(numOfTbl, tblNum);
D
dapan1121 已提交
232 233
}

H
Hongze Cheng 已提交
234
void ctgFreeVgInfo(SDBVgInfo* vgInfo) {
D
dapan1121 已提交
235 236 237 238
  if (NULL == vgInfo) {
    return;
  }

239 240
  taosHashCleanup(vgInfo->vgHash);
  taosArrayDestroy(vgInfo->vgArray);
H
Hongze Cheng 已提交
241

D
dapan1121 已提交
242 243 244
  taosMemoryFreeClear(vgInfo);
}

H
Hongze Cheng 已提交
245
void ctgFreeVgInfoCache(SCtgDBCache* dbCache) { ctgFreeVgInfo(dbCache->vgCache.vgInfo); }
D
dapan1121 已提交
246

H
Hongze Cheng 已提交
247
void ctgFreeDbCache(SCtgDBCache* dbCache) {
D
dapan1121 已提交
248 249 250 251
  if (NULL == dbCache) {
    return;
  }

D
dapan1121 已提交
252 253 254
  ctgFreeVgInfoCache(dbCache);
  ctgFreeStbMetaCache(dbCache);
  ctgFreeTbCache(dbCache);
D
dapan1121 已提交
255 256
}

D
dapan1121 已提交
257 258 259 260
void ctgFreeInstDbCache(SHashObj* pDbCache) {
  if (NULL == pDbCache) {
    return;
  }
H
Hongze Cheng 已提交
261

D
dapan1121 已提交
262
  int32_t dbNum = taosHashGetSize(pDbCache);
H
Hongze Cheng 已提交
263 264

  void* pIter = taosHashIterate(pDbCache, NULL);
D
dapan1121 已提交
265
  while (pIter) {
H
Hongze Cheng 已提交
266
    SCtgDBCache* dbCache = pIter;
D
dapan1121 已提交
267 268
    atomic_store_8(&dbCache->deleted, 1);
    ctgFreeDbCache(dbCache);
H
Hongze Cheng 已提交
269

D
dapan1121 已提交
270
    pIter = taosHashIterate(pDbCache, pIter);
H
Hongze Cheng 已提交
271
  }
D
dapan1121 已提交
272 273

  taosHashCleanup(pDbCache);
H
Hongze Cheng 已提交
274

D
dapan1121 已提交
275 276
  CTG_CACHE_STAT_DEC(numOfDb, dbNum);
}
D
dapan1121 已提交
277

D
dapan1121 已提交
278 279 280
void ctgFreeInstUserCache(SHashObj* pUserCache) {
  if (NULL == pUserCache) {
    return;
D
dapan1121 已提交
281
  }
H
Hongze Cheng 已提交
282

D
dapan1121 已提交
283
  int32_t userNum = taosHashGetSize(pUserCache);
H
Hongze Cheng 已提交
284 285

  void* pIter = taosHashIterate(pUserCache, NULL);
D
dapan1121 已提交
286
  while (pIter) {
H
Hongze Cheng 已提交
287
    SCtgUserAuth* userCache = pIter;
D
dapan1121 已提交
288
    ctgFreeSCtgUserAuth(userCache);
H
Hongze Cheng 已提交
289

D
dapan1121 已提交
290
    pIter = taosHashIterate(pUserCache, pIter);
H
Hongze Cheng 已提交
291 292
  }

D
dapan1121 已提交
293
  taosHashCleanup(pUserCache);
H
Hongze Cheng 已提交
294

D
dapan1121 已提交
295 296
  CTG_CACHE_STAT_DEC(numOfUser, userNum);
}
D
dapan1121 已提交
297

D
dapan1121 已提交
298 299 300
void ctgFreeHandleImpl(SCatalog* pCtg) {
  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);
D
dapan1121 已提交
301

D
dapan1121 已提交
302 303
  ctgFreeInstDbCache(pCtg->dbCache);
  ctgFreeInstUserCache(pCtg->userCache);
D
dapan1121 已提交
304 305 306 307

  taosMemoryFree(pCtg);
}

D
dapan1121 已提交
308 309 310 311 312
void ctgFreeHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
    return;
  }

D
dapan1121 已提交
313
  uint64_t clusterId = pCtg->clusterId;
D
dapan1121 已提交
314

D
dapan1121 已提交
315 316
  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);
D
dapan1121 已提交
317

D
dapan1121 已提交
318 319 320 321
  ctgFreeInstDbCache(pCtg->dbCache);
  ctgFreeInstUserCache(pCtg->userCache);

  CTG_CACHE_STAT_DEC(numOfCluster, 1);
D
dapan1121 已提交
322

D
dapan1121 已提交
323
  taosMemoryFree(pCtg);
D
dapan1121 已提交
324

C
Cary Xu 已提交
325
  ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
D
dapan1121 已提交
326 327
}

D
dapan1121 已提交
328 329 330 331
void ctgClearHandle(SCatalog* pCtg) {
  if (NULL == pCtg) {
    return;
  }
D
dapan1121 已提交
332

D
dapan1121 已提交
333 334 335 336 337 338 339 340 341 342
  uint64_t clusterId = pCtg->clusterId;

  ctgFreeMetaRent(&pCtg->dbRent);
  ctgFreeMetaRent(&pCtg->stbRent);

  ctgFreeInstDbCache(pCtg->dbCache);
  ctgFreeInstUserCache(pCtg->userCache);

  ctgMetaRentInit(&pCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB);
  ctgMetaRentInit(&pCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE);
H
Hongze Cheng 已提交
343 344 345

  pCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false,
                               HASH_ENTRY_LOCK);
D
dapan1121 已提交
346 347 348
  if (NULL == pCtg->dbCache) {
    qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
  }
H
Hongze Cheng 已提交
349 350 351

  pCtg->userCache = taosHashInit(gCtgMgmt.cfg.maxUserCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false,
                                 HASH_ENTRY_LOCK);
D
dapan1121 已提交
352 353 354 355 356 357
  if (NULL == pCtg->userCache) {
    ctgError("taosHashInit %d user cache failed", gCtgMgmt.cfg.maxUserCacheNum);
  }

  CTG_CACHE_STAT_INC(numOfClear, 1);

C
Cary Xu 已提交
358
  ctgInfo("handle cleared, clusterId:0x%" PRIx64, clusterId);
D
dapan1121 已提交
359
}
D
dapan1121 已提交
360

D
dapan1121 已提交
361
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
D
dapan1121 已提交
362
  if (NULL == pOutput) {
D
dapan1121 已提交
363 364 365
    return;
  }

D
dapan1121 已提交
366 367 368 369
  if (pOutput->dbVgroup) {
    taosHashCleanup(pOutput->dbVgroup->vgHash);
    taosMemoryFreeClear(pOutput->dbVgroup);
  }
H
Hongze Cheng 已提交
370

D
dapan1121 已提交
371 372 373 374 375 376 377 378
  taosMemoryFree(pOutput);
}

void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
  taosMemoryFreeClear(pCtx->target);
  if (NULL == pCtx->out) {
    return;
  }
H
Hongze Cheng 已提交
379

D
dapan1121 已提交
380 381 382 383 384 385 386
  switch (pCtx->reqType) {
    case TDMT_MND_GET_DB_CFG: {
      SDbCfgInfo* pOut = (SDbCfgInfo*)pCtx->out;
      taosArrayDestroy(pOut->pRetensions);
      taosMemoryFreeClear(pCtx->out);
      break;
    }
H
Hongze Cheng 已提交
387
    case TDMT_MND_USE_DB: {
D
dapan1121 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410
      SUseDbOutput* pOut = (SUseDbOutput*)pCtx->out;
      ctgFreeSUseDbOutput(pOut);
      pCtx->out = NULL;
      break;
    }
    case TDMT_MND_GET_INDEX: {
      SIndexInfo* pOut = (SIndexInfo*)pCtx->out;
      taosMemoryFreeClear(pCtx->out);
      break;
    }
    case TDMT_MND_QNODE_LIST: {
      SArray* pOut = (SArray*)pCtx->out;
      taosArrayDestroy(pOut);
      pCtx->out = NULL;
      break;
    }
    case TDMT_VND_TABLE_META:
    case TDMT_MND_TABLE_META: {
      STableMetaOutput* pOut = (STableMetaOutput*)pCtx->out;
      taosMemoryFree(pOut->tbMeta);
      taosMemoryFreeClear(pCtx->out);
      break;
    }
D
dapan1121 已提交
411
    case TDMT_MND_GET_TABLE_INDEX: {
D
dapan1121 已提交
412
      STableIndex* pOut = (STableIndex*)pCtx->out;
D
dapan1121 已提交
413
      if (pOut) {
D
dapan1121 已提交
414
        taosArrayDestroyEx(pOut->pIndex, tFreeSTableIndexInfo);
D
dapan1121 已提交
415 416 417 418
        taosMemoryFreeClear(pCtx->out);
      }
      break;
    }
D
dapan1121 已提交
419 420 421 422 423 424 425
    case TDMT_VND_TABLE_CFG:
    case TDMT_MND_TABLE_CFG: {
      STableCfgRsp* pOut = (STableCfgRsp*)pCtx->out;
      tFreeSTableCfgRsp(pOut);
      taosMemoryFreeClear(pCtx->out);
      break;
    }
D
dapan1121 已提交
426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446
    case TDMT_MND_RETRIEVE_FUNC: {
      SFuncInfo* pOut = (SFuncInfo*)pCtx->out;
      taosMemoryFree(pOut->pCode);
      taosMemoryFree(pOut->pComment);
      taosMemoryFreeClear(pCtx->out);
      break;
    }
    case TDMT_MND_GET_USER_AUTH: {
      SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pCtx->out;
      taosHashCleanup(pOut->createdDbs);
      taosHashCleanup(pOut->readDbs);
      taosHashCleanup(pOut->writeDbs);
      taosMemoryFreeClear(pCtx->out);
      break;
    }
    default:
      qError("invalid reqType %d", pCtx->reqType);
      break;
  }
}

D
dapan1121 已提交
447 448 449 450 451 452 453 454
void ctgFreeTbMetasMsgCtx(SCtgMsgCtx* pCtx) {
  ctgFreeMsgCtx(pCtx);
  if (pCtx->lastOut) {
    ctgFreeSTableMetaOutput((STableMetaOutput*)pCtx->lastOut);
    pCtx->lastOut = NULL;
  }
}

D
dapan1121 已提交
455 456 457 458
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) {
  if (NULL == pOutput) {
    return;
  }
H
Hongze Cheng 已提交
459

D
dapan1121 已提交
460 461 462 463 464 465 466 467
  taosMemoryFree(pOutput->tbMeta);
  taosMemoryFree(pOutput);
}

void ctgResetTbMetaTask(SCtgTask* pTask) {
  SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
  memset(&taskCtx->tbInfo, 0, sizeof(taskCtx->tbInfo));
  taskCtx->flag = CTG_FLAG_UNKNOWN_STB;
H
Hongze Cheng 已提交
468

D
dapan1121 已提交
469 470 471 472 473 474 475 476 477 478 479 480
  if (pTask->msgCtx.lastOut) {
    ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
    pTask->msgCtx.lastOut = NULL;
  }
  if (pTask->msgCtx.out) {
    ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.out);
    pTask->msgCtx.out = NULL;
  }
  taosMemoryFreeClear(pTask->msgCtx.target);
  taosMemoryFreeClear(pTask->res);
}

D
dapan1121 已提交
481 482 483 484
void ctgFreeBatchMeta(void* meta) {
  if (NULL == meta) {
    return;
  }
H
Hongze Cheng 已提交
485

D
dapan1121 已提交
486 487 488 489
  SMetaRes* pRes = (SMetaRes*)meta;
  taosMemoryFreeClear(pRes->pRes);
}

490 491 492 493
void ctgFreeBatchHash(void* hash) {
  if (NULL == hash) {
    return;
  }
H
Hongze Cheng 已提交
494

495 496 497 498
  SMetaRes* pRes = (SMetaRes*)hash;
  taosMemoryFreeClear(pRes->pRes);
}

H
Hongze Cheng 已提交
499
void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
D
dapan1121 已提交
500
  switch (type) {
D
dapan1121 已提交
501 502
    case CTG_TASK_GET_QNODE:
    case CTG_TASK_GET_DNODE:
D
dapan1121 已提交
503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520
    case CTG_TASK_GET_DB_VGROUP: {
      taosArrayDestroy((SArray*)*pRes);
      *pRes = NULL;
      break;
    }
    case CTG_TASK_GET_DB_CFG: {
      if (*pRes) {
        SDbCfgInfo* pInfo = (SDbCfgInfo*)*pRes;
        taosArrayDestroy(pInfo->pRetensions);
        taosMemoryFreeClear(*pRes);
      }
      break;
    }
    case CTG_TASK_GET_TB_INDEX: {
      taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
      *pRes = NULL;
      break;
    }
D
dapan1121 已提交
521 522 523 524 525 526 527 528
    case CTG_TASK_GET_TB_CFG: {
      if (*pRes) {
        STableCfg* pInfo = (STableCfg*)*pRes;
        tFreeSTableCfgRsp(pInfo);
        taosMemoryFreeClear(*pRes);
      }
      break;
    }
D
dapan1121 已提交
529 530 531
    case CTG_TASK_GET_TB_HASH:
    case CTG_TASK_GET_DB_INFO:
    case CTG_TASK_GET_INDEX:
H
Hongze Cheng 已提交
532 533
    case CTG_TASK_GET_UDF:
    case CTG_TASK_GET_USER:
D
dapan1121 已提交
534 535
    case CTG_TASK_GET_SVR_VER:
    case CTG_TASK_GET_TB_META: {
D
dapan1121 已提交
536 537 538
      taosMemoryFreeClear(*pRes);
      break;
    }
D
dapan1121 已提交
539 540 541 542 543 544
    case CTG_TASK_GET_TB_META_BATCH: {
      SArray* pArray = (SArray*)*pRes;
      int32_t num = taosArrayGetSize(pArray);
      for (int32_t i = 0; i < num; ++i) {
        ctgFreeBatchMeta(taosArrayGet(pArray, i));
      }
H
Hongze Cheng 已提交
545
      *pRes = NULL;  // no need to free it
D
dapan1121 已提交
546 547
      break;
    }
548 549 550 551 552 553
    case CTG_TASK_GET_TB_HASH_BATCH: {
      SArray* pArray = (SArray*)*pRes;
      int32_t num = taosArrayGetSize(pArray);
      for (int32_t i = 0; i < num; ++i) {
        ctgFreeBatchHash(taosArrayGet(pArray, i));
      }
H
Hongze Cheng 已提交
554
      *pRes = NULL;  // no need to free it
555
      break;
H
Hongze Cheng 已提交
556
    }
D
dapan1121 已提交
557 558 559 560 561 562
    default:
      qError("invalid task type %d", type);
      break;
  }
}

H
Hongze Cheng 已提交
563
void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void** pRes) {
D
dapan1121 已提交
564
  switch (type) {
D
dapan1121 已提交
565
    case CTG_TASK_GET_QNODE:
D
dapan1121 已提交
566 567 568 569 570
    case CTG_TASK_GET_DNODE: {
      taosArrayDestroy((SArray*)*pRes);
      *pRes = NULL;
      break;
    }
D
dapan1121 已提交
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
    case CTG_TASK_GET_DB_VGROUP: {
      if (*pRes) {
        SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
        taosHashCleanup(pInfo->vgHash);
        taosMemoryFreeClear(*pRes);
      }
      break;
    }
    case CTG_TASK_GET_DB_CFG: {
      if (*pRes) {
        SDbCfgInfo* pInfo = (SDbCfgInfo*)*pRes;
        taosArrayDestroy(pInfo->pRetensions);
        taosMemoryFreeClear(*pRes);
      }
      break;
    }
    case CTG_TASK_GET_TB_INDEX: {
      taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
      *pRes = NULL;
      break;
    }
D
dapan1121 已提交
592 593 594 595 596 597 598 599
    case CTG_TASK_GET_TB_CFG: {
      if (*pRes) {
        STableCfg* pInfo = (STableCfg*)*pRes;
        tFreeSTableCfgRsp(pInfo);
        taosMemoryFreeClear(*pRes);
      }
      break;
    }
H
Hongze Cheng 已提交
600
    case CTG_TASK_GET_TB_META:
D
dapan1121 已提交
601 602
    case CTG_TASK_GET_DB_INFO:
    case CTG_TASK_GET_TB_HASH:
H
Hongze Cheng 已提交
603 604 605
    case CTG_TASK_GET_INDEX:
    case CTG_TASK_GET_UDF:
    case CTG_TASK_GET_SVR_VER:
D
dapan1121 已提交
606 607 608 609
    case CTG_TASK_GET_USER: {
      taosMemoryFreeClear(*pRes);
      break;
    }
D
dapan1121 已提交
610 611 612 613 614
    case CTG_TASK_GET_TB_META_BATCH: {
      taosArrayDestroyEx(*pRes, ctgFreeBatchMeta);
      *pRes = NULL;
      break;
    }
615 616 617 618 619
    case CTG_TASK_GET_TB_HASH_BATCH: {
      taosArrayDestroyEx(*pRes, ctgFreeBatchHash);
      *pRes = NULL;
      break;
    }
D
dapan1121 已提交
620 621 622 623 624 625
    default:
      qError("invalid task type %d", type);
      break;
  }
}

H
Hongze Cheng 已提交
626
void ctgClearSubTaskRes(SCtgSubRes* pRes) {
D
dapan1121 已提交
627 628 629 630 631 632 633 634 635 636
  pRes->code = 0;

  if (NULL == pRes->res) {
    return;
  }

  ctgFreeSubTaskRes(pRes->type, &pRes->res);
}

void ctgFreeTaskCtx(SCtgTask* pTask) {
D
dapan1121 已提交
637 638 639 640 641 642 643 644
  switch (pTask->type) {
    case CTG_TASK_GET_TB_META: {
      SCtgTbMetaCtx* taskCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
      taosMemoryFreeClear(taskCtx->pName);
      if (pTask->msgCtx.lastOut) {
        ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
        pTask->msgCtx.lastOut = NULL;
      }
D
dapan1121 已提交
645
      taosMemoryFreeClear(pTask->taskCtx);
D
dapan1121 已提交
646 647
      break;
    }
D
dapan1121 已提交
648
    case CTG_TASK_GET_TB_META_BATCH: {
649
      SCtgTbMetasCtx* taskCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
650
      taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchMeta);
D
dapan1121 已提交
651 652 653
      taosArrayDestroy(taskCtx->pFetchs);
      // NO NEED TO FREE pNames

D
dapan1121 已提交
654
      taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeTbMetasMsgCtx);
H
Hongze Cheng 已提交
655

D
dapan1121 已提交
656 657 658 659 660 661 662
      if (pTask->msgCtx.lastOut) {
        ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
        pTask->msgCtx.lastOut = NULL;
      }
      taosMemoryFreeClear(pTask->taskCtx);
      break;
    }
D
dapan1121 已提交
663 664 665
    case CTG_TASK_GET_TB_HASH: {
      SCtgTbHashCtx* taskCtx = (SCtgTbHashCtx*)pTask->taskCtx;
      taosMemoryFreeClear(taskCtx->pName);
H
Hongze Cheng 已提交
666
      taosMemoryFreeClear(pTask->taskCtx);
D
dapan1121 已提交
667 668
      break;
    }
669
    case CTG_TASK_GET_TB_HASH_BATCH: {
670
      SCtgTbHashsCtx* taskCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
671 672 673 674 675
      taosArrayDestroyEx(taskCtx->pResList, ctgFreeBatchHash);
      taosArrayDestroy(taskCtx->pFetchs);
      // NO NEED TO FREE pNames

      taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx);
H
Hongze Cheng 已提交
676

677 678
      taosMemoryFreeClear(pTask->taskCtx);
      break;
H
Hongze Cheng 已提交
679
    }
D
dapan1121 已提交
680 681 682 683
    case CTG_TASK_GET_TB_INDEX: {
      SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
      taosMemoryFreeClear(taskCtx->pName);
      taosMemoryFreeClear(pTask->taskCtx);
D
dapan1121 已提交
684 685 686 687 688 689 690
      break;
    }
    case CTG_TASK_GET_TB_CFG: {
      SCtgTbCfgCtx* taskCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
      taosMemoryFreeClear(taskCtx->pName);
      taosMemoryFreeClear(taskCtx->pVgInfo);
      taosMemoryFreeClear(pTask->taskCtx);
D
dapan1121 已提交
691 692
      break;
    }
D
dapan1121 已提交
693 694
    case CTG_TASK_GET_DB_VGROUP:
    case CTG_TASK_GET_DB_CFG:
H
Hongze Cheng 已提交
695
    case CTG_TASK_GET_DB_INFO:
D
dapan1121 已提交
696 697
    case CTG_TASK_GET_INDEX:
    case CTG_TASK_GET_UDF:
H
Hongze Cheng 已提交
698
    case CTG_TASK_GET_QNODE:
D
dapan1121 已提交
699 700 701 702 703 704 705 706 707 708
    case CTG_TASK_GET_USER: {
      taosMemoryFreeClear(pTask->taskCtx);
      break;
    }
    default:
      qError("invalid task type %d", pTask->type);
      break;
  }
}

D
dapan1121 已提交
709 710 711 712 713 714 715 716 717
void ctgFreeTask(SCtgTask* pTask) {
  ctgFreeMsgCtx(&pTask->msgCtx);
  ctgFreeTaskRes(pTask->type, &pTask->res);
  ctgFreeTaskCtx(pTask);

  taosArrayDestroy(pTask->pParents);
  ctgClearSubTaskRes(&pTask->subRes);
}

D
dapan1121 已提交
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735
void ctgFreeTasks(SArray* pArray) {
  if (NULL == pArray) {
    return;
  }

  int32_t num = taosArrayGetSize(pArray);
  for (int32_t i = 0; i < num; ++i) {
    SCtgTask* pTask = taosArrayGet(pArray, i);
    ctgFreeTask(pTask);
  }

  taosArrayDestroy(pArray);
}

void ctgFreeJob(void* job) {
  if (NULL == job) {
    return;
  }
H
Hongze Cheng 已提交
736

D
dapan1121 已提交
737 738
  SCtgJob* pJob = (SCtgJob*)job;

H
Hongze Cheng 已提交
739
  int64_t  rid = pJob->refId;
D
dapan1121 已提交
740 741 742
  uint64_t qid = pJob->queryId;

  ctgFreeTasks(pJob->pTasks);
D
dapan1121 已提交
743
  ctgFreeBatchs(pJob->pBatchs);
D
dapan1121 已提交
744 745 746 747 748

  ctgFreeSMetaData(&pJob->jobRes);

  taosMemoryFree(job);

D
dapan1121 已提交
749
  qDebug("QID:0x%" PRIx64 ", ctg job 0x%" PRIx64 " freed", qid, rid);
D
dapan1121 已提交
750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
}

int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target) {
  ctgFreeMsgCtx(pCtx);

  pCtx->reqType = reqType;
  pCtx->out = out;
  if (target) {
    pCtx->target = strdup(target);
    if (NULL == pCtx->target) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  } else {
    pCtx->target = NULL;
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
769 770
int32_t ctgAddMsgCtx(SArray* pCtxs, int32_t reqType, void* out, char* target) {
  SCtgMsgCtx ctx = {0};
H
Hongze Cheng 已提交
771

D
dapan1121 已提交
772 773 774 775 776 777 778 779 780 781 782 783 784 785
  ctx.reqType = reqType;
  ctx.out = out;
  if (target) {
    ctx.target = strdup(target);
    if (NULL == ctx.target) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

  taosArrayPush(pCtxs, &ctx);

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
786
int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp* fp) {
D
dapan1121 已提交
787 788 789 790 791 792 793 794 795
  switch (hashMethod) {
    default:
      *fp = MurmurHash3_32;
      break;
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
796 797 798 799 800 801
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList) {
  SHashObj*    vgroupHash = NULL;
  SVgroupInfo* vgInfo = NULL;
  SArray*      vgList = NULL;
  int32_t      code = 0;
  int32_t      vgNum = taosHashGetSize(vgHash);
D
dapan1121 已提交
802 803 804 805

  vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
  if (NULL == vgList) {
    ctgError("taosArrayInit failed, num:%d", vgNum);
H
Hongze Cheng 已提交
806
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
807 808
  }

H
Hongze Cheng 已提交
809
  void* pIter = taosHashIterate(vgHash, NULL);
D
dapan1121 已提交
810 811 812 813 814
  while (pIter) {
    vgInfo = pIter;

    if (NULL == taosArrayPush(vgList, vgInfo)) {
      ctgError("taosArrayPush failed, vgId:%d", vgInfo->vgId);
H
Hongze Cheng 已提交
815
      taosHashCancelIterate(vgHash, pIter);
D
dapan1121 已提交
816
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
817
    }
H
Hongze Cheng 已提交
818

D
dapan1121 已提交
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837
    pIter = taosHashIterate(vgHash, pIter);
    vgInfo = NULL;
  }

  *pList = vgList;

  ctgDebug("Got vgList from cache, vgNum:%d", vgNum);

  return TSDB_CODE_SUCCESS;

_return:

  if (vgList) {
    taosArrayDestroy(vgList);
  }

  CTG_RET(code);
}

838
int ctgVgInfoComp(const void* lp, const void* rp) {
D
dapan1121 已提交
839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862
  SVgroupInfo* pLeft = (SVgroupInfo*)lp;
  SVgroupInfo* pRight = (SVgroupInfo*)rp;
  if (pLeft->hashBegin < pRight->hashBegin) {
    return -1;
  } else if (pLeft->hashBegin > pRight->hashBegin) {
    return 1;
  }

  return 0;
}

int32_t ctgHashValueComp(void const* lp, void const* rp) {
  uint32_t*    key = (uint32_t*)lp;
  SVgroupInfo* pVg = (SVgroupInfo*)rp;

  if (*key < pVg->hashBegin) {
    return -1;
  } else if (*key > pVg->hashEnd) {
    return 1;
  }

  return 0;
}

H
Hongze Cheng 已提交
863
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
D
dapan1121 已提交
864
  int32_t code = 0;
H
Hongze Cheng 已提交
865

866
  int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
H
Hongze Cheng 已提交
867
  char    db[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
868 869 870 871 872 873 874
  tNameGetFullDbName(pTableName, db);

  if (vgNum <= 0) {
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
    CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
  }

H
Hongze Cheng 已提交
875 876
  SVgroupInfo* vgInfo = NULL;
  char         tbFullName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
877 878
  tNameExtractFullName(pTableName, tbFullName);

H
Hongze Cheng 已提交
879 880
  uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
                                        dbInfo->hashPrefix, dbInfo->hashSuffix);
D
dapan1121 已提交
881

882 883 884
  vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);

/*
H
Hongze Cheng 已提交
885
  void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
D
dapan1121 已提交
886 887 888 889 890 891
  while (pIter) {
    vgInfo = pIter;
    if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
      taosHashCancelIterate(dbInfo->vgHash, pIter);
      break;
    }
H
Hongze Cheng 已提交
892

D
dapan1121 已提交
893 894 895
    pIter = taosHashIterate(dbInfo->vgHash, pIter);
    vgInfo = NULL;
  }
896
*/
D
dapan1121 已提交
897 898

  if (NULL == vgInfo) {
D
dapan1121 已提交
899 900
    ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
             (int32_t)taosArrayGetSize(dbInfo->vgArray));
D
dapan1121 已提交
901 902 903 904 905 906 907 908 909 910
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
  }

  *pVgroup = *vgInfo;

  ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
           vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
           vgInfo->epSet.eps[vgInfo->epSet.inUse].port);

  CTG_RET(code);
D
dapan1121 已提交
911 912
}

H
Hongze Cheng 已提交
913 914 915
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
                                   char* dbFName, SArray* pNames, bool update) {
  int32_t   code = 0;
D
dapan1121 已提交
916
  SCtgTask* pTask = tReq->pTask;
H
Hongze Cheng 已提交
917
  SMetaRes  res = {0};
918
  int32_t   vgNum = taosArrayGetSize(dbInfo->vgArray);
919 920
  if (vgNum <= 0) {
    ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
D
dapan1121 已提交
921
    CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
922 923
  }

H
Hongze Cheng 已提交
924 925
  SVgroupInfo* vgInfo = NULL;
  int32_t      tbNum = taosArrayGetSize(pNames);
926 927 928 929 930 931 932 933

  if (1 == vgNum) {
    for (int32_t i = 0; i < tbNum; ++i) {
      vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
      if (NULL == vgInfo) {
        CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
      }

934
      *vgInfo = *(SVgroupInfo*)taosArrayGet(dbInfo->vgArray, 0);
935 936

      ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
H
Hongze Cheng 已提交
937
               vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
938 939

      if (update) {
D
dapan1121 已提交
940
        SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
H
Hongze Cheng 已提交
941
        SMetaRes*  pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
942 943 944 945 946 947 948 949 950 951 952 953 954
        pRes->pRes = vgInfo;
      } else {
        res.pRes = vgInfo;
        taosArrayPush(pCtx->pResList, &res);
      }
    }

    return TSDB_CODE_SUCCESS;
  }

  char tbFullName[TSDB_TABLE_FNAME_LEN];
  sprintf(tbFullName, "%s.", dbFName);
  int32_t offset = strlen(tbFullName);
H
Hongze Cheng 已提交
955
  SName*  pName = NULL;
956
  int32_t tbNameLen = 0;
H
Hongze Cheng 已提交
957

958
  for (int32_t i = 0; i < tbNum; ++i) {
959
    pName = taosArrayGet(pNames, i);
960 961 962 963

    tbNameLen = offset + strlen(pName->tname);
    strcpy(tbFullName + offset, pName->tname);

H
Hongze Cheng 已提交
964 965
    uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
                                          dbInfo->hashPrefix, dbInfo->hashSuffix);
966

967 968
    vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
    if (NULL == vgInfo) {
D
dapan1121 已提交
969
      ctgError("2no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
970
               taosArrayGetSize(dbInfo->vgArray));
971 972 973 974 975 976 977 978 979 980
      CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
    }

    SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
    if (NULL == pNewVg) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }

    *pNewVg = *vgInfo;

H
Hongze Cheng 已提交
981 982 983
    ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
             vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
             vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
984 985

    if (update) {
D
dapan1121 已提交
986
      SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
H
Hongze Cheng 已提交
987
      SMetaRes*  pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
988 989 990 991
      pRes->pRes = pNewVg;
    } else {
      res.pRes = pNewVg;
      taosArrayPush(pCtx->pResList, &res);
H
Hongze Cheng 已提交
992
    }
993 994 995 996 997
  }

  CTG_RET(code);
}

D
dapan1121 已提交
998
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
H
Hongze Cheng 已提交
999
  if (*(uint64_t*)key1 < ((SSTableVersion*)key2)->suid) {
D
dapan1121 已提交
1000
    return -1;
H
Hongze Cheng 已提交
1001
  } else if (*(uint64_t*)key1 > ((SSTableVersion*)key2)->suid) {
D
dapan1121 已提交
1002 1003 1004 1005 1006 1007 1008
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
H
Hongze Cheng 已提交
1009
  if (*(int64_t*)key1 < ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
1010
    return -1;
H
Hongze Cheng 已提交
1011
  } else if (*(int64_t*)key1 > ((SDbVgVersion*)key2)->dbId) {
D
dapan1121 已提交
1012 1013 1014 1015 1016 1017 1018
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
D
dapan1121 已提交
1019
  if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
D
dapan1121 已提交
1020
    return -1;
D
dapan1121 已提交
1021
  } else if (((SSTableVersion*)key1)->suid > ((SSTableVersion*)key2)->suid) {
D
dapan1121 已提交
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037
    return 1;
  } else {
    return 0;
  }
}

int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
  if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) {
    return -1;
  } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) {
    return 1;
  } else {
    return 0;
  }
}

H
Hongze Cheng 已提交
1038
int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
D
dapan1121 已提交
1039 1040 1041
  *dst = taosMemoryMalloc(sizeof(SDBVgInfo));
  if (NULL == *dst) {
    qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
D
dapan1121 已提交
1042
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1043 1044 1045 1046 1047 1048 1049 1050 1051
  }

  memcpy(*dst, src, sizeof(SDBVgInfo));

  size_t hashSize = taosHashGetSize(src->vgHash);
  (*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
  if (NULL == (*dst)->vgHash) {
    qError("taosHashInit %d failed", (int32_t)hashSize);
    taosMemoryFreeClear(*dst);
D
dapan1121 已提交
1052
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1053 1054
  }

H
Hongze Cheng 已提交
1055 1056
  int32_t* vgId = NULL;
  void*    pIter = taosHashIterate(src->vgHash, NULL);
D
dapan1121 已提交
1057 1058 1059
  while (pIter) {
    vgId = taosHashGetKey(pIter, NULL);

H
Hongze Cheng 已提交
1060
    if (taosHashPut((*dst)->vgHash, (void*)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
D
dapan1121 已提交
1061 1062 1063 1064
      qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
      taosHashCancelIterate(src->vgHash, pIter);
      taosHashCleanup((*dst)->vgHash);
      taosMemoryFreeClear(*dst);
D
dapan1121 已提交
1065
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1066
    }
H
Hongze Cheng 已提交
1067

D
dapan1121 已提交
1068 1069 1070
    pIter = taosHashIterate(src->vgHash, pIter);
  }

1071 1072 1073 1074
  if (src->vgArray) {
    (*dst)->vgArray = taosArrayDup(src->vgArray, NULL);
  }

D
dapan1121 已提交
1075 1076 1077
  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1078
int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput) {
D
dapan1121 已提交
1079 1080 1081
  *pOutput = taosMemoryMalloc(sizeof(STableMetaOutput));
  if (NULL == *pOutput) {
    qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
D
dapan1121 已提交
1082
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1083 1084 1085 1086 1087 1088 1089
  }

  memcpy(*pOutput, output, sizeof(STableMetaOutput));

  if (output->tbMeta) {
    int32_t metaSize = CTG_META_SIZE(output->tbMeta);
    (*pOutput)->tbMeta = taosMemoryMalloc(metaSize);
D
dapan1121 已提交
1090
    qDebug("tbMeta cloned, size:%d, p:%p", metaSize, (*pOutput)->tbMeta);
D
dapan1121 已提交
1091 1092 1093
    if (NULL == (*pOutput)->tbMeta) {
      qError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
      taosMemoryFreeClear(*pOutput);
D
dapan1121 已提交
1094
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
1095 1096 1097 1098 1099 1100 1101 1102
    }

    memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
  if (NULL == pIndex) {
    *pRes = NULL;
    return TSDB_CODE_SUCCESS;
  }

  int32_t num = taosArrayGetSize(pIndex);
  *pRes = taosArrayInit(num, sizeof(STableIndexInfo));
  if (NULL == *pRes) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  for (int32_t i = 0; i < num; ++i) {
H
Hongze Cheng 已提交
1116
    STableIndexInfo* pInfo = taosArrayGet(pIndex, i);
D
dapan1121 已提交
1117 1118
    pInfo = taosArrayPush(*pRes, pInfo);
    pInfo->expr = strdup(pInfo->expr);
D
dapan1121 已提交
1119 1120 1121 1122 1123
  }

  return TSDB_CODE_SUCCESS;
}

H
Hongze Cheng 已提交
1124
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo* pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) {
D
dapan1121 已提交
1125
  if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META) {
D
dapan1121 已提交
1126
    pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
D
dapan1121 已提交
1127
    pMsgSendInfo->target.vgId = vgId;
D
dapan1121 已提交
1128 1129 1130 1131 1132 1133 1134
    pMsgSendInfo->target.dbFName = strdup(dbFName);
  } else {
    pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
1135

H
Hongze Cheng 已提交
1136
int32_t ctgGetTablesReqNum(SArray* pList) {
1137 1138 1139 1140 1141 1142 1143
  if (NULL == pList) {
    return 0;
  }

  int32_t total = 0;
  int32_t n = taosArrayGetSize(pList);
  for (int32_t i = 0; i < n; ++i) {
H
Hongze Cheng 已提交
1144
    STablesReq* pReq = taosArrayGet(pList, i);
1145 1146 1147 1148 1149 1150
    total += taosArrayGetSize(pReq->pTables);
  }

  return total;
}

H
Hongze Cheng 已提交
1151
int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, int32_t flag) {
1152 1153 1154
  if (NULL == (*pFetchs)) {
    *pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgFetch));
  }
H
Hongze Cheng 已提交
1155

1156 1157 1158 1159 1160 1161
  SCtgFetch fetch = {0};
  fetch.dbIdx = dbIdx;
  fetch.tbIdx = tbIdx;
  fetch.fetchIdx = (*fetchIdx)++;
  fetch.resIdx = resIdx;
  fetch.flag = flag;
H
Hongze Cheng 已提交
1162

1163 1164 1165 1166 1167
  taosArrayPush(*pFetchs, &fetch);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
1168 1169 1170 1171
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
  STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
  return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
}
1172

H
Haojun Liao 已提交
1173
static void* ctgCloneDbVgroup(void* pSrc) { return taosArrayDup((const SArray*)pSrc, NULL); }
1174

H
Hongze Cheng 已提交
1175
static void ctgFreeDbVgroup(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
1176 1177 1178 1179 1180 1181 1182 1183 1184 1185

static void* ctgCloneDbCfgInfo(void* pSrc) {
  SDbCfgInfo* pDst = taosMemoryMalloc(sizeof(SDbCfgInfo));
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, sizeof(SDbCfgInfo));
  return pDst;
}

H
Hongze Cheng 已提交
1186
static void ctgFreeDbCfgInfo(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1187 1188 1189 1190 1191 1192 1193 1194 1195 1196

static void* ctgCloneDbInfo(void* pSrc) {
  SDbInfo* pDst = taosMemoryMalloc(sizeof(SDbInfo));
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, sizeof(SDbInfo));
  return pDst;
}

H
Hongze Cheng 已提交
1197
static void ctgFreeDbInfo(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209

static void* ctgCloneTableMeta(void* pSrc) {
  STableMeta* pMeta = pSrc;
  int32_t size = sizeof(STableMeta) + (pMeta->tableInfo.numOfColumns + pMeta->tableInfo.numOfTags) * sizeof(SSchema);
  STableMeta* pDst = taosMemoryMalloc(size);
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, size);
  return pDst;
}

H
Hongze Cheng 已提交
1210
static void ctgFreeTableMeta(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1211 1212 1213 1214 1215 1216 1217 1218 1219 1220

static void* ctgCloneVgroupInfo(void* pSrc) {
  SVgroupInfo* pDst = taosMemoryMalloc(sizeof(SVgroupInfo));
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, sizeof(SVgroupInfo));
  return pDst;
}

H
Hongze Cheng 已提交
1221
static void ctgFreeVgroupInfo(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1222

H
Haojun Liao 已提交
1223
static void* ctgCloneTableIndices(void* pSrc) { return taosArrayDup((const SArray*)pSrc, NULL); }
1224

H
Hongze Cheng 已提交
1225
static void ctgFreeTableIndices(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
1226 1227 1228 1229 1230 1231 1232 1233 1234 1235

static void* ctgCloneFuncInfo(void* pSrc) {
  SFuncInfo* pDst = taosMemoryMalloc(sizeof(SFuncInfo));
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, sizeof(SFuncInfo));
  return pDst;
}

H
Hongze Cheng 已提交
1236
static void ctgFreeFuncInfo(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1237 1238 1239 1240 1241 1242 1243 1244 1245 1246

static void* ctgCloneIndexInfo(void* pSrc) {
  SIndexInfo* pDst = taosMemoryMalloc(sizeof(SIndexInfo));
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, sizeof(SIndexInfo));
  return pDst;
}

H
Hongze Cheng 已提交
1247
static void ctgFreeIndexInfo(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1248 1249 1250 1251 1252 1253 1254 1255 1256 1257

static void* ctgCloneUserAuth(void* pSrc) {
  bool* pDst = taosMemoryMalloc(sizeof(bool));
  if (NULL == pDst) {
    return NULL;
  }
  *pDst = *(bool*)pSrc;
  return pDst;
}

H
Hongze Cheng 已提交
1258
static void ctgFreeUserAuth(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1259

H
Haojun Liao 已提交
1260
static void* ctgCloneQnodeList(void* pSrc) { return taosArrayDup((const SArray*)pSrc, NULL); }
D
dapan1121 已提交
1261

H
Hongze Cheng 已提交
1262
static void ctgFreeQnodeList(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
1263 1264 1265 1266 1267 1268 1269 1270 1271 1272

static void* ctgCloneTableCfg(void* pSrc) {
  STableCfg* pDst = taosMemoryMalloc(sizeof(STableCfg));
  if (NULL == pDst) {
    return NULL;
  }
  memcpy(pDst, pSrc, sizeof(STableCfg));
  return pDst;
}

H
Hongze Cheng 已提交
1273
static void ctgFreeTableCfg(void* p) { taosMemoryFree(((SMetaRes*)p)->pRes); }
1274

H
Haojun Liao 已提交
1275
static void* ctgCloneDnodeList(void* pSrc) { return taosArrayDup((const SArray*)pSrc, NULL); }
1276

H
Hongze Cheng 已提交
1277
static void ctgFreeDnodeList(void* p) { taosArrayDestroy((SArray*)((SMetaRes*)p)->pRes); }
1278

H
Haojun Liao 已提交
1279
static int32_t ctgCloneMetaDataArray(SArray* pSrc, __array_item_dup_fn_t copyFunc, SArray** pDst) {
1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290
  if (NULL == pSrc) {
    return TSDB_CODE_SUCCESS;
  }

  int32_t size = taosArrayGetSize(pSrc);
  *pDst = taosArrayInit(size, sizeof(SMetaRes));
  if (NULL == *pDst) {
    return TSDB_CODE_OUT_OF_MEMORY;
  }
  for (int32_t i = 0; i < size; ++i) {
    SMetaRes* pRes = taosArrayGet(pSrc, i);
H
Hongze Cheng 已提交
1291
    SMetaRes  res = {.code = pRes->code, .pRes = copyFunc(pRes->pRes)};
1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359
    if (NULL == res.pRes) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
    taosArrayPush(*pDst, &res);
  }

  return TSDB_CODE_SUCCESS;
}

SMetaData* catalogCloneMetaData(SMetaData* pData) {
  SMetaData* pRes = taosMemoryCalloc(1, sizeof(SMetaData));
  if (NULL == pRes) {
    return NULL;
  }

  int32_t code = ctgCloneMetaDataArray(pData->pDbVgroup, ctgCloneDbVgroup, &pRes->pDbVgroup);
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pDbCfg, ctgCloneDbCfgInfo, &pRes->pDbCfg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pDbInfo, ctgCloneDbInfo, &pRes->pDbInfo);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pTableMeta, ctgCloneTableMeta, &pRes->pTableMeta);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pTableHash, ctgCloneVgroupInfo, &pRes->pTableHash);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pTableIndex, ctgCloneTableIndices, &pRes->pTableIndex);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pUdfList, ctgCloneFuncInfo, &pRes->pUdfList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pIndex, ctgCloneIndexInfo, &pRes->pIndex);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pUser, ctgCloneUserAuth, &pRes->pUser);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pQnodeList, ctgCloneQnodeList, &pRes->pQnodeList);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pTableCfg, ctgCloneTableCfg, &pRes->pTableCfg);
  }
  if (TSDB_CODE_SUCCESS == code) {
    code = ctgCloneMetaDataArray(pData->pDnodeList, ctgCloneDnodeList, &pRes->pDnodeList);
  }

  if (TSDB_CODE_SUCCESS != code) {
    catalogFreeMetaData(pRes);
    return NULL;
  }

  return pRes;
}

void catalogFreeMetaData(SMetaData* pData) {
  if (NULL == pData) {
    return;
  }

  taosArrayDestroyEx(pData->pDbVgroup, ctgFreeDbVgroup);
  taosArrayDestroyEx(pData->pDbCfg, ctgFreeDbCfgInfo);
  taosArrayDestroyEx(pData->pDbInfo, ctgFreeDbInfo);
  taosArrayDestroyEx(pData->pTableMeta, ctgFreeTableMeta);
  taosArrayDestroyEx(pData->pTableHash, ctgFreeVgroupInfo);
H
Hongze Cheng 已提交
1360 1361
  taosArrayDestroyEx(pData->pTableIndex, ctgFreeTableIndices);
  taosArrayDestroyEx(pData->pUdfList, ctgFreeFuncInfo);
1362 1363 1364 1365 1366 1367 1368 1369
  taosArrayDestroyEx(pData->pIndex, ctgFreeIndexInfo);
  taosArrayDestroyEx(pData->pUser, ctgFreeUserAuth);
  taosArrayDestroyEx(pData->pQnodeList, ctgFreeQnodeList);
  taosArrayDestroyEx(pData->pTableCfg, ctgFreeTableCfg);
  taosArrayDestroyEx(pData->pDnodeList, ctgFreeDnodeList);
  taosMemoryFreeClear(pData->pSvrVer);
  taosMemoryFree(pData);
}