You need to sign in or sign up before continuing.
ctgAsync.c 67.4 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tname.h"
D
dapan1121 已提交
20
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
21
#include "trpc.h"
D
dapan1121 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24
int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SName*   name = (SName*)param;
D
dapan1121 已提交
25
  SCtgTask task = {0};
D
dapan1121 已提交
26

D
dapan1121 已提交
27 28 29
  task.type = CTG_TASK_GET_TB_META;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
30

D
dapan1121 已提交
31 32
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetaCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
33 34 35
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
36
  SCtgTbMetaCtx* ctx = task.taskCtx;
D
dapan1121 已提交
37 38
  ctx->pName = taosMemoryMalloc(sizeof(*name));
  if (NULL == ctx->pName) {
D
dapan1121 已提交
39
    taosMemoryFree(task.taskCtx);
D
dapan1121 已提交
40 41
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
42

D
dapan1121 已提交
43 44 45
  memcpy(ctx->pName, name, sizeof(*name));
  ctx->flag = CTG_FLAG_UNKNOWN_STB;

D
dapan1121 已提交
46 47
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
48 49
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), name->tname);
D
dapan1121 已提交
50 51 52 53

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
54 55
int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SName*   name = (SName*)param;
D
dapan1121 已提交
56 57 58 59 60 61
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_TB_META_BATCH;
  task.taskId = taskIdx;
  task.pJob = pJob;

62
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbMetasCtx));
D
dapan1121 已提交
63 64 65 66
  if (NULL == task.taskCtx) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

67
  SCtgTbMetasCtx* ctx = task.taskCtx;
D
dapan1121 已提交
68
  ctx->pNames = param;
69
  ctx->pResList = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
D
dapan1121 已提交
70 71 72

  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
73 74
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
D
dapan1121 已提交
75 76 77 78

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
79 80
int32_t ctgInitGetDbVgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  char*    dbFName = (char*)param;
D
dapan1121 已提交
81
  SCtgTask task = {0};
D
dapan1121 已提交
82

D
dapan1121 已提交
83 84 85
  task.type = CTG_TASK_GET_DB_VGROUP;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
86

D
dapan1121 已提交
87 88
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbVgCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
89 90 91
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
92
  SCtgDbVgCtx* ctx = task.taskCtx;
93

D
dapan1121 已提交
94 95
  memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));

D
dapan1121 已提交
96 97
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
98 99
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), dbFName);
D
dapan1121 已提交
100 101 102 103

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
104 105
int32_t ctgInitGetDbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  char*    dbFName = (char*)param;
D
dapan1121 已提交
106
  SCtgTask task = {0};
D
dapan1121 已提交
107

D
dapan1121 已提交
108 109 110
  task.type = CTG_TASK_GET_DB_CFG;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
111

D
dapan1121 已提交
112 113
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbCfgCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
114 115 116
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
117
  SCtgDbCfgCtx* ctx = task.taskCtx;
118

D
dapan1121 已提交
119 120
  memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));

D
dapan1121 已提交
121 122
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
123 124
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), dbFName);
D
dapan1121 已提交
125 126 127 128

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
129 130
int32_t ctgInitGetDbInfoTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  char*    dbFName = (char*)param;
D
dapan1121 已提交
131 132 133 134 135 136 137 138 139 140 141 142
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_DB_INFO;
  task.taskId = taskIdx;
  task.pJob = pJob;

  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgDbInfoCtx));
  if (NULL == task.taskCtx) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  SCtgDbInfoCtx* ctx = task.taskCtx;
143

D
dapan1121 已提交
144 145 146 147
  memcpy(ctx->dbFName, dbFName, sizeof(ctx->dbFName));

  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
148 149
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), dbFName);
D
dapan1121 已提交
150 151 152 153

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
154 155
int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SName*   name = (SName*)param;
D
dapan1121 已提交
156
  SCtgTask task = {0};
D
dapan1121 已提交
157

D
dapan1121 已提交
158 159 160
  task.type = CTG_TASK_GET_TB_HASH;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
161

D
dapan1121 已提交
162 163
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
164 165 166
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
167
  SCtgTbHashCtx* ctx = task.taskCtx;
D
dapan1121 已提交
168 169
  ctx->pName = taosMemoryMalloc(sizeof(*name));
  if (NULL == ctx->pName) {
D
dapan1121 已提交
170
    taosMemoryFree(task.taskCtx);
D
dapan1121 已提交
171 172
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
173

D
dapan1121 已提交
174 175 176
  memcpy(ctx->pName, name, sizeof(*name));
  tNameGetFullDbName(ctx->pName, ctx->dbFName);

D
dapan1121 已提交
177 178
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
179 180
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), name->tname);
D
dapan1121 已提交
181 182 183 184

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
185 186
int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SName*   name = (SName*)param;
187 188 189 190 191 192
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_TB_HASH_BATCH;
  task.taskId = taskIdx;
  task.pJob = pJob;

193
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbHashsCtx));
194 195 196 197
  if (NULL == task.taskCtx) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

198
  SCtgTbHashsCtx* ctx = task.taskCtx;
199
  ctx->pNames = param;
200
  ctx->pResList = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
201 202 203

  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
204 205
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
206 207 208 209

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
210
int32_t ctgInitGetQnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
D
dapan1121 已提交
211 212 213 214 215 216
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_QNODE;
  task.taskId = taskIdx;
  task.pJob = pJob;
  task.taskCtx = NULL;
D
dapan1121 已提交
217

D
dapan1121 已提交
218
  taosArrayPush(pJob->pTasks, &task);
D
dapan1121 已提交
219

D
dapan1121 已提交
220
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
D
dapan1121 已提交
221 222 223 224

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
225
int32_t ctgInitGetDnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
D
dapan1121 已提交
226 227 228 229 230 231 232 233 234
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_DNODE;
  task.taskId = taskIdx;
  task.pJob = pJob;
  task.taskCtx = NULL;

  taosArrayPush(pJob->pTasks, &task);

D
dapan1121 已提交
235
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
D
dapan1121 已提交
236 237 238 239

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
240 241
int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  char*    name = (char*)param;
D
dapan1121 已提交
242
  SCtgTask task = {0};
D
dapan1121 已提交
243

D
dapan1121 已提交
244 245 246
  task.type = CTG_TASK_GET_INDEX;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
247

D
dapan1121 已提交
248 249
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgIndexCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
250 251 252
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
253
  SCtgIndexCtx* ctx = task.taskCtx;
254

D
dapan1121 已提交
255 256
  strcpy(ctx->indexFName, name);

D
dapan1121 已提交
257 258
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
259 260
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), name);
D
dapan1121 已提交
261 262 263 264

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
265 266
int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  char*    name = (char*)param;
D
dapan1121 已提交
267
  SCtgTask task = {0};
D
dapan1121 已提交
268

D
dapan1121 已提交
269 270 271
  task.type = CTG_TASK_GET_UDF;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
272

D
dapan1121 已提交
273 274
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgUdfCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
275 276 277
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
278
  SCtgUdfCtx* ctx = task.taskCtx;
279

D
dapan1121 已提交
280 281
  strcpy(ctx->udfName, name);

D
dapan1121 已提交
282 283
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
284 285
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), name);
D
dapan1121 已提交
286 287 288 289

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
290 291 292
int32_t ctgInitGetUserTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SUserAuthInfo* user = (SUserAuthInfo*)param;
  SCtgTask       task = {0};
D
dapan1121 已提交
293

D
dapan1121 已提交
294 295 296
  task.type = CTG_TASK_GET_USER;
  task.taskId = taskIdx;
  task.pJob = pJob;
D
dapan1121 已提交
297

D
dapan1121 已提交
298 299
  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgUserCtx));
  if (NULL == task.taskCtx) {
D
dapan1121 已提交
300 301 302
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
303
  SCtgUserCtx* ctx = task.taskCtx;
304

D
dapan1121 已提交
305 306
  memcpy(&ctx->user, user, sizeof(*user));

D
dapan1121 已提交
307 308
  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
309 310
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), user->user);
D
dapan1121 已提交
311 312 313 314

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
315
int32_t ctgInitGetSvrVerTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
D
dapan1121 已提交
316 317 318 319 320 321 322 323
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_SVR_VER;
  task.taskId = taskIdx;
  task.pJob = pJob;

  taosArrayPush(pJob->pTasks, &task);

D
dapan1121 已提交
324
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
D
dapan1121 已提交
325 326 327 328

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
329 330
int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SName*   name = (SName*)param;
D
dapan1121 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_TB_INDEX;
  task.taskId = taskIdx;
  task.pJob = pJob;

  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbIndexCtx));
  if (NULL == task.taskCtx) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  SCtgTbIndexCtx* ctx = task.taskCtx;
  ctx->pName = taosMemoryMalloc(sizeof(*name));
  if (NULL == ctx->pName) {
    taosMemoryFree(task.taskCtx);
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
348

D
dapan1121 已提交
349 350 351 352
  memcpy(ctx->pName, name, sizeof(*name));

  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
353 354
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), name->tname);
D
dapan1121 已提交
355 356 357 358

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
359 360
int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
  SName*   name = (SName*)param;
D
dapan1121 已提交
361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
  SCtgTask task = {0};

  task.type = CTG_TASK_GET_TB_CFG;
  task.taskId = taskIdx;
  task.pJob = pJob;

  task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbCfgCtx));
  if (NULL == task.taskCtx) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  SCtgTbCfgCtx* ctx = task.taskCtx;
  ctx->pName = taosMemoryMalloc(sizeof(*name));
  if (NULL == ctx->pName) {
    taosMemoryFree(task.taskCtx);
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
378

D
dapan1121 已提交
379 380 381 382
  memcpy(ctx->pName, name, sizeof(*name));

  taosArrayPush(pJob->pTasks, &task);

dengyihao's avatar
dengyihao 已提交
383 384
  qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
         ctgTaskTypeStr(task.type), name->tname);
D
dapan1121 已提交
385 386 387 388

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
389
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
D
dapan1121 已提交
390
  SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
D
dapan1121 已提交
391 392 393 394
  SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (NULL == pDb || NULL == pTb) {
    taosHashCleanup(pDb);
    taosHashCleanup(pTb);
D
dapan1121 已提交
395 396
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
397

D
dapan1121 已提交
398 399 400 401
  for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
    char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
    taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
  }
D
dapan1121 已提交
402

D
dapan1121 已提交
403 404 405 406
  for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
    char* dbFName = taosArrayGet(pReq->pDbCfg, i);
    taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
  }
D
dapan1121 已提交
407

D
dapan1121 已提交
408 409 410 411
  for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
    char* dbFName = taosArrayGet(pReq->pDbInfo, i);
    taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
  }
D
dapan1121 已提交
412

D
dapan1121 已提交
413 414 415 416 417 418 419 420 421
  int32_t dbNum = taosArrayGetSize(pReq->pTableMeta);
  for (int32_t i = 0; i < dbNum; ++i) {
    STablesReq* p = taosArrayGet(pReq->pTableMeta, i);
    taosHashPut(pDb, p->dbFName, strlen(p->dbFName), p->dbFName, TSDB_DB_FNAME_LEN);
    int32_t tbNum = taosArrayGetSize(p->pTables);
    for (int32_t m = 0; m < tbNum; ++m) {
      SName* name = taosArrayGet(p->pTables, m);
      taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
    }
D
dapan1121 已提交
422
  }
D
dapan1121 已提交
423 424 425 426 427 428 429 430 431 432

  dbNum = taosArrayGetSize(pReq->pTableHash);
  for (int32_t i = 0; i < dbNum; ++i) {
    STablesReq* p = taosArrayGet(pReq->pTableHash, i);
    taosHashPut(pDb, p->dbFName, strlen(p->dbFName), p->dbFName, TSDB_DB_FNAME_LEN);
    int32_t tbNum = taosArrayGetSize(p->pTables);
    for (int32_t m = 0; m < tbNum; ++m) {
      SName* name = taosArrayGet(p->pTables, m);
      taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
    }
D
dapan1121 已提交
433
  }
D
dapan1121 已提交
434

D
dapan1121 已提交
435 436
  for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
    SName* name = taosArrayGet(pReq->pTableCfg, i);
dengyihao's avatar
dengyihao 已提交
437
    char   dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
438 439 440 441
    tNameGetFullDbName(name, dbFName);
    taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
  }

D
dapan1121 已提交
442 443 444 445
  char* dbFName = taosHashIterate(pDb, NULL);
  while (dbFName) {
    ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
    dbFName = taosHashIterate(pDb, dbFName);
D
dapan1121 已提交
446 447
  }

D
dapan1121 已提交
448 449
  taosHashCleanup(pDb);

D
dapan1121 已提交
450
  // REFRESH TABLE META
D
dapan1121 已提交
451

D
dapan1121 已提交
452 453 454 455
  for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
    SName* name = taosArrayGet(pReq->pTableCfg, i);
    taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
  }
D
dapan1121 已提交
456

D
dapan1121 已提交
457 458
  SName* name = taosHashIterate(pTb, NULL);
  while (name) {
D
dapan1121 已提交
459
    ctgRemoveTbMeta(pCtg, name);
D
dapan1121 已提交
460
    name = taosHashIterate(pTb, name);
D
dapan1121 已提交
461 462
  }

D
dapan1121 已提交
463 464
  taosHashCleanup(pTb);

D
dapan1121 已提交
465 466 467 468 469
  for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
    SName* name = taosArrayGet(pReq->pTableIndex, i);
    ctgDropTbIndexEnqueue(pCtg, name, true);
  }

D
dapan1121 已提交
470 471
  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
472

dengyihao's avatar
dengyihao 已提交
473
int32_t ctgInitTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
D
dapan1121 已提交
474 475 476 477 478 479 480 481 482
  int32_t tid = atomic_fetch_add_32(&pJob->taskIdx, 1);

  CTG_LOCK(CTG_WRITE, &pJob->taskLock);
  CTG_ERR_RET((*gCtgAsyncFps[type].initFp)(pJob, tid, param));
  CTG_UNLOCK(CTG_WRITE, &pJob->taskLock);

  if (taskId) {
    *taskId = tid;
  }
483

D
dapan1121 已提交
484 485 486
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
487 488
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp,
                   void* param) {
D
dapan1121 已提交
489
  int32_t code = 0;
490
  int32_t tbMetaNum = (int32_t)ctgGetTablesReqNum(pReq->pTableMeta);
D
dapan1121 已提交
491
  int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
492
  int32_t tbHashNum = (int32_t)ctgGetTablesReqNum(pReq->pTableHash);
D
dapan1121 已提交
493 494
  int32_t udfNum = (int32_t)taosArrayGetSize(pReq->pUdf);
  int32_t qnodeNum = pReq->qNodeRequired ? 1 : 0;
D
dapan1121 已提交
495
  int32_t dnodeNum = pReq->dNodeRequired ? 1 : 0;
D
dapan1121 已提交
496
  int32_t svrVerNum = pReq->svrVerRequired ? 1 : 0;
D
dapan1121 已提交
497 498 499
  int32_t dbCfgNum = (int32_t)taosArrayGetSize(pReq->pDbCfg);
  int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
  int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
D
dapan1121 已提交
500
  int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
D
dapan1121 已提交
501
  int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
D
dapan1121 已提交
502
  int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
D
dapan1121 已提交
503

dengyihao's avatar
dengyihao 已提交
504 505
  int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
                    userNum + dbInfoNum + tbIndexNum + tbCfgNum;
506

D
dapan1121 已提交
507 508
  *job = taosMemoryCalloc(1, sizeof(SCtgJob));
  if (NULL == *job) {
D
dapan1121 已提交
509
    ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
D
dapan1121 已提交
510 511 512
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

dengyihao's avatar
dengyihao 已提交
513
  SCtgJob* pJob = *job;
D
dapan1121 已提交
514 515

  pJob->subTaskNum = taskNum;
D
dapan1121 已提交
516
  pJob->queryId = pConn->requestId;
D
dapan1121 已提交
517
  pJob->userFp = fp;
dengyihao's avatar
dengyihao 已提交
518 519
  pJob->pCtg = pCtg;
  pJob->conn = *pConn;
D
dapan1121 已提交
520
  pJob->userParam = param;
521

D
dapan1121 已提交
522 523 524
  pJob->tbMetaNum = tbMetaNum;
  pJob->tbHashNum = tbHashNum;
  pJob->qnodeNum = qnodeNum;
D
dapan1121 已提交
525
  pJob->dnodeNum = dnodeNum;
D
dapan1121 已提交
526 527 528 529 530
  pJob->dbVgNum = dbVgNum;
  pJob->udfNum = udfNum;
  pJob->dbCfgNum = dbCfgNum;
  pJob->indexNum = indexNum;
  pJob->userNum = userNum;
D
dapan1121 已提交
531
  pJob->dbInfoNum = dbInfoNum;
D
dapan1121 已提交
532
  pJob->tbIndexNum = tbIndexNum;
D
dapan1121 已提交
533
  pJob->tbCfgNum = tbCfgNum;
D
dapan1121 已提交
534
  pJob->svrVerNum = svrVerNum;
535

D
dapan1121 已提交
536
#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
537 538
  pJob->pBatchs =
      taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
D
dapan1121 已提交
539 540 541 542 543
  if (NULL == pJob->pBatchs) {
    ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
#endif
544

D
dapan1121 已提交
545
  pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
D
dapan1121 已提交
546
  if (NULL == pJob->pTasks) {
D
dapan1121 已提交
547
    ctgError("taosArrayInit %d tasks failed", taskNum);
D
dapan1121 已提交
548 549 550
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }

D
dapan1121 已提交
551 552
  if (pReq->forceUpdate && taskNum) {
    CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, taskNum, pJob, pReq));
D
dapan1121 已提交
553 554
  }

D
dapan1121 已提交
555
  for (int32_t i = 0; i < dbVgNum; ++i) {
556
    char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
D
dapan1121 已提交
557
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_VGROUP, dbFName, NULL));
D
dapan1121 已提交
558 559 560
  }

  for (int32_t i = 0; i < dbCfgNum; ++i) {
561
    char* dbFName = taosArrayGet(pReq->pDbCfg, i);
D
dapan1121 已提交
562
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_CFG, dbFName, NULL));
D
dapan1121 已提交
563 564
  }

D
dapan1121 已提交
565
  for (int32_t i = 0; i < dbInfoNum; ++i) {
566
    char* dbFName = taosArrayGet(pReq->pDbInfo, i);
D
dapan1121 已提交
567
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DB_INFO, dbFName, NULL));
D
dapan1121 已提交
568 569
  }

D
dapan1121 已提交
570
#if 0
D
dapan1121 已提交
571
  for (int32_t i = 0; i < tbMetaNum; ++i) {
572
    SName* name = taosArrayGet(pReq->pTableMeta, i);
D
dapan1121 已提交
573
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META, name, NULL));
D
dapan1121 已提交
574
  }
D
dapan1121 已提交
575 576 577 578 579
#else
  if (tbMetaNum > 0) {
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_META_BATCH, pReq->pTableMeta, NULL));
  }
#endif
D
dapan1121 已提交
580

581
#if 0
D
dapan1121 已提交
582
  for (int32_t i = 0; i < tbHashNum; ++i) {
583
    SName* name = taosArrayGet(pReq->pTableHash, i);
D
dapan1121 已提交
584
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH, name, NULL));
D
dapan1121 已提交
585
  }
586 587 588 589 590
#else
  if (tbHashNum > 0) {
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_HASH_BATCH, pReq->pTableHash, NULL));
  }
#endif
D
dapan1121 已提交
591

D
dapan1121 已提交
592 593
  for (int32_t i = 0; i < tbIndexNum; ++i) {
    SName* name = taosArrayGet(pReq->pTableIndex, i);
D
dapan1121 已提交
594 595 596 597 598 599
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_INDEX, name, NULL));
  }

  for (int32_t i = 0; i < tbCfgNum; ++i) {
    SName* name = taosArrayGet(pReq->pTableCfg, i);
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_CFG, name, NULL));
D
dapan1121 已提交
600 601
  }

D
dapan1121 已提交
602
  for (int32_t i = 0; i < indexNum; ++i) {
603
    char* indexName = taosArrayGet(pReq->pIndex, i);
D
dapan1121 已提交
604
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_INDEX, indexName, NULL));
D
dapan1121 已提交
605 606 607
  }

  for (int32_t i = 0; i < udfNum; ++i) {
608
    char* udfName = taosArrayGet(pReq->pUdf, i);
D
dapan1121 已提交
609
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_UDF, udfName, NULL));
D
dapan1121 已提交
610 611 612
  }

  for (int32_t i = 0; i < userNum; ++i) {
613
    SUserAuthInfo* user = taosArrayGet(pReq->pUser, i);
D
dapan1121 已提交
614
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_USER, user, NULL));
D
dapan1121 已提交
615 616 617
  }

  if (qnodeNum) {
D
dapan1121 已提交
618
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_QNODE, NULL, NULL));
D
dapan1121 已提交
619 620
  }

D
dapan1121 已提交
621 622 623 624
  if (dnodeNum) {
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_DNODE, NULL, NULL));
  }

D
dapan1121 已提交
625 626 627 628
  if (svrVerNum) {
    CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_SVR_VER, NULL, NULL));
  }

H
Haojun Liao 已提交
629 630 631 632 633 634 635 636
  pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
  if (pJob->refId < 0) {
    ctgError("add job to ref failed, error: %s", tstrerror(terrno));
    CTG_ERR_JRET(terrno);
  }

  taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);

dengyihao's avatar
dengyihao 已提交
637 638
  qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId,
         taskNum, pReq->forceUpdate);
H
Haojun Liao 已提交
639 640
  return TSDB_CODE_SUCCESS;

D
dapan1121 已提交
641
_return:
D
dapan1121 已提交
642

D
dapan1121 已提交
643
  ctgFreeJob(*job);
D
dapan1121 已提交
644 645 646 647 648 649
  CTG_RET(code);
}

int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pTableMeta) {
D
dapan1121 已提交
650
    pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
D
dapan1121 已提交
651 652 653 654 655
    if (NULL == pJob->jobRes.pTableMeta) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
656 657
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pTableMeta, &res);
D
dapan1121 已提交
658 659 660 661

  return TSDB_CODE_SUCCESS;
}

662
int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
D
dapan1121 已提交
663 664 665 666 667 668 669
  SCtgJob* pJob = pTask->pJob;

  pJob->jobRes.pTableMeta = pTask->res;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
670 671 672
int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pDbVgroup) {
D
dapan1121 已提交
673
    pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, sizeof(SMetaRes));
D
dapan1121 已提交
674 675 676 677 678
    if (NULL == pJob->jobRes.pDbVgroup) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
679 680
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pDbVgroup, &res);
D
dapan1121 已提交
681 682 683 684 685 686 687

  return TSDB_CODE_SUCCESS;
}

int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pTableHash) {
D
dapan1121 已提交
688
    pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
D
dapan1121 已提交
689 690 691 692 693
    if (NULL == pJob->jobRes.pTableHash) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
694 695
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pTableHash, &res);
D
dapan1121 已提交
696 697 698 699

  return TSDB_CODE_SUCCESS;
}

700
int32_t ctgDumpTbHashsRes(SCtgTask* pTask) {
701 702 703 704 705 706 707
  SCtgJob* pJob = pTask->pJob;

  pJob->jobRes.pTableHash = pTask->res;

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
708 709 710 711 712 713 714 715 716 717
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pTableIndex) {
    pJob->jobRes.pTableIndex = taosArrayInit(pJob->tbIndexNum, sizeof(SMetaRes));
    if (NULL == pJob->jobRes.pTableIndex) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
D
dapan1121 已提交
718
  taosArrayPush(pJob->jobRes.pTableIndex, &res);
D
dapan1121 已提交
719 720 721 722

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
int32_t ctgDumpTbCfgRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pTableCfg) {
    pJob->jobRes.pTableCfg = taosArrayInit(pJob->tbCfgNum, sizeof(SMetaRes));
    if (NULL == pJob->jobRes.pTableCfg) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pTableCfg, &res);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
738 739 740
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pIndex) {
D
dapan1121 已提交
741
    pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SMetaRes));
D
dapan1121 已提交
742 743 744 745 746
    if (NULL == pJob->jobRes.pIndex) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
747 748
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pIndex, &res);
D
dapan1121 已提交
749 750 751 752 753 754

  return TSDB_CODE_SUCCESS;
}

int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
D
dapan1121 已提交
755 756 757 758 759 760
  if (NULL == pJob->jobRes.pQnodeList) {
    pJob->jobRes.pQnodeList = taosArrayInit(1, sizeof(SMetaRes));
    if (NULL == pJob->jobRes.pQnodeList) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }
D
dapan1121 已提交
761

D
dapan1121 已提交
762 763
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pQnodeList, &res);
D
dapan1121 已提交
764 765 766 767

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782
int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pDnodeList) {
    pJob->jobRes.pDnodeList = taosArrayInit(1, sizeof(SMetaRes));
    if (NULL == pJob->jobRes.pDnodeList) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pDnodeList, &res);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
783 784 785
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pDbCfg) {
D
dapan1121 已提交
786
    pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SMetaRes));
D
dapan1121 已提交
787 788 789 790 791
    if (NULL == pJob->jobRes.pDbCfg) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
792 793
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pDbCfg, &res);
D
dapan1121 已提交
794 795 796 797

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
798 799 800
int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pDbInfo) {
D
dapan1121 已提交
801
    pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SMetaRes));
D
dapan1121 已提交
802 803 804 805 806
    if (NULL == pJob->jobRes.pDbInfo) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
807 808
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pDbInfo, &res);
D
dapan1121 已提交
809 810 811 812

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
813 814 815
int32_t ctgDumpUdfRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pUdfList) {
D
dapan1121 已提交
816
    pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SMetaRes));
D
dapan1121 已提交
817 818 819 820 821
    if (NULL == pJob->jobRes.pUdfList) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
822 823
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pUdfList, &res);
D
dapan1121 已提交
824 825 826 827 828 829 830

  return TSDB_CODE_SUCCESS;
}

int32_t ctgDumpUserRes(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pUser) {
D
dapan1121 已提交
831
    pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(SMetaRes));
D
dapan1121 已提交
832 833 834 835 836
    if (NULL == pJob->jobRes.pUser) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

D
dapan1121 已提交
837 838
  SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
  taosArrayPush(pJob->jobRes.pUser, &res);
D
dapan1121 已提交
839 840 841 842

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
843 844 845 846 847 848 849 850 851 852 853
int32_t ctgDumpSvrVer(SCtgTask* pTask) {
  SCtgJob* pJob = pTask->pJob;
  if (NULL == pJob->jobRes.pSvrVer) {
    pJob->jobRes.pSvrVer = taosMemoryCalloc(1, sizeof(SMetaRes));
    if (NULL == pJob->jobRes.pSvrVer) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
  }

  pJob->jobRes.pSvrVer->code = pTask->code;
  pJob->jobRes.pSvrVer->pRes = pTask->res;
854

D
dapan1121 已提交
855 856 857
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
858
int32_t ctgCallSubCb(SCtgTask* pTask) {
D
dapan1121 已提交
859
  int32_t code = 0;
860

D
dapan1121 已提交
861
  CTG_LOCK(CTG_WRITE, &pTask->lock);
862

D
dapan1121 已提交
863 864
  int32_t parentNum = taosArrayGetSize(pTask->pParents);
  for (int32_t i = 0; i < parentNum; ++i) {
dengyihao's avatar
dengyihao 已提交
865 866
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
    SCtgTask*   pParent = taosArrayGetP(pTask->pParents, i);
867

D
dapan1121 已提交
868 869 870 871 872 873 874
    pParent->subRes.code = pTask->code;
    if (TSDB_CODE_SUCCESS == pTask->code) {
      code = (*gCtgAsyncFps[pTask->type].cloneFp)(pTask, &pParent->subRes.res);
      if (code) {
        pParent->subRes.code = code;
      }
    }
875

dengyihao's avatar
dengyihao 已提交
876
    SCtgMsgCtx* pParMsgCtx = CTG_GET_TASK_MSGCTX(pParent, -1);
D
dapan1121 已提交
877

878
    pParMsgCtx->pBatchs = pMsgCtx->pBatchs;
D
dapan1121 已提交
879 880
    CTG_ERR_JRET(pParent->subRes.fp(pParent));
  }
881

D
dapan1121 已提交
882 883 884 885
_return:

  CTG_UNLOCK(CTG_WRITE, &pTask->lock);

886
  CTG_RET(code);
D
dapan1121 已提交
887 888
}

D
dapan1121 已提交
889 890
int32_t ctgCallUserCb(void* param) {
  SCtgJob* pJob = (SCtgJob*)param;
D
dapan1121 已提交
891 892

  qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
893

D
dapan1121 已提交
894 895
  (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);

D
dapan1121 已提交
896 897
  qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);

D
dapan1121 已提交
898 899 900 901
  taosRemoveRef(gCtgMgmt.jobPool, pJob->refId);

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
902

D
dapan1121 已提交
903 904
int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
  SCtgJob* pJob = pTask->pJob;
dengyihao's avatar
dengyihao 已提交
905
  int32_t  code = 0;
D
dapan1121 已提交
906

D
dapan1121 已提交
907 908 909 910
  if (CTG_TASK_DONE == pTask->status) {
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
911
  qDebug("QID:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode));
D
dapan1121 已提交
912

D
dapan1121 已提交
913
  pTask->code = rspCode;
D
dapan1121 已提交
914 915
  pTask->status = CTG_TASK_DONE;

D
dapan1121 已提交
916
  ctgCallSubCb(pTask);
D
dapan1121 已提交
917 918 919

  int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
  if (taskDone < taosArrayGetSize(pJob->pTasks)) {
dengyihao's avatar
dengyihao 已提交
920 921
    qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
           (int32_t)taosArrayGetSize(pJob->pTasks));
D
dapan1121 已提交
922 923 924 925 926 927 928
    return TSDB_CODE_SUCCESS;
  }

  CTG_ERR_JRET(ctgMakeAsyncRes(pJob));

_return:

D
dapan1121 已提交
929
  pJob->jobResCode = code;
D
dapan1121 已提交
930

dengyihao's avatar
dengyihao 已提交
931 932
  // taosSsleep(2);
  // qDebug("QID:0x%" PRIx64 " ctg after sleep", pJob->queryId);
933

D
dapan1121 已提交
934
  taosAsyncExec(ctgCallUserCb, pJob, NULL);
935

D
dapan1121 已提交
936 937 938
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
939 940 941 942 943
int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t           code = 0;
  SCtgDBCache*      dbCache = NULL;
  SCtgTask*         pTask = tReq->pTask;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
944
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
945 946 947 948 949
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
  SCtgTbMetaCtx*    ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
  SName*            pName = ctx->pName;
  int32_t           flag = ctx->flag;
  int32_t*          vgId = &ctx->vgId;
D
dapan1121 已提交
950 951 952 953 954 955 956 957 958 959 960 961 962

  CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));

  switch (reqType) {
    case TDMT_MND_USE_DB: {
      SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;

      SVgroupInfo vgInfo = {0};
      CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));

      ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);

      *vgId = vgInfo.vgId;
D
dapan1121 已提交
963
      CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
D
dapan1121 已提交
964 965 966 967 968

      return TSDB_CODE_SUCCESS;
    }
    case TDMT_MND_TABLE_META: {
      STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
969

D
dapan1121 已提交
970 971 972 973
      if (CTG_IS_META_NULL(pOut->metaType)) {
        if (CTG_FLAG_IS_STB(flag)) {
          char dbFName[TSDB_DB_FNAME_LEN] = {0};
          tNameGetFullDbName(pName, dbFName);
974

D
dapan1121 已提交
975 976 977 978
          CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
          if (NULL != dbCache) {
            SVgroupInfo vgInfo = {0};
            CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
979

D
dapan1121 已提交
980 981
            ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);

982
            *vgId = vgInfo.vgId;
D
dapan1121 已提交
983
            CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
D
dapan1121 已提交
984 985 986 987

            ctgReleaseVgInfoToCache(pCtg, dbCache);
          } else {
            SBuildUseDBInput input = {0};
988

D
dapan1121 已提交
989 990
            tstrncpy(input.db, dbFName, tListLen(input.db));
            input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
991

D
dapan1121 已提交
992
            CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
D
dapan1121 已提交
993 994 995 996
          }

          return TSDB_CODE_SUCCESS;
        }
997

D
dapan1121 已提交
998 999
        ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
        ctgRemoveTbMetaFromCache(pCtg, pName, false);
1000

D
dapan1121 已提交
1001 1002 1003 1004 1005 1006 1007 1008
        CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
      }

      if (pMsgCtx->lastOut) {
        TSWAP(pMsgCtx->out, pMsgCtx->lastOut);
        STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
        TSWAP(pLastOut->tbMeta, pOut->tbMeta);
      }
1009

D
dapan1121 已提交
1010 1011 1012 1013
      break;
    }
    case TDMT_VND_TABLE_META: {
      STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
1014

D
dapan1121 已提交
1015 1016 1017 1018 1019 1020 1021 1022 1023
      if (CTG_IS_META_NULL(pOut->metaType)) {
        ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
        ctgRemoveTbMetaFromCache(pCtg, pName, false);
        CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
      }

      if (CTG_FLAG_IS_STB(flag)) {
        break;
      }
1024

D
dapan1121 已提交
1025 1026
      if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
        ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
1027

D
dapan1121 已提交
1028
        taosMemoryFreeClear(pOut->tbMeta);
1029

D
dapan1121 已提交
1030
        CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
D
dapan1121 已提交
1031 1032 1033 1034 1035 1036 1037 1038 1039
      } else if (CTG_IS_META_BOTH(pOut->metaType)) {
        int32_t exist = 0;
        if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
          SName stbName = *pName;
          strcpy(stbName.tname, pOut->tbName);
          SCtgTbMetaCtx stbCtx = {0};
          stbCtx.flag = flag;
          stbCtx.pName = &stbName;

1040
          taosMemoryFreeClear(pOut->tbMeta);
D
dapan1121 已提交
1041 1042 1043 1044 1045
          CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
          if (pOut->tbMeta) {
            exist = 1;
          }
        }
1046

D
dapan1121 已提交
1047 1048
        if (0 == exist) {
          TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
D
dapan1121 已提交
1049
          CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
D
dapan1121 已提交
1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066
        }
      }
      break;
    }
    default:
      ctgError("invalid reqType %d", reqType);
      CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
      break;
  }

  STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;

  ctgUpdateTbMetaToCache(pCtg, pOut, false);

  if (CTG_IS_META_BOTH(pOut->metaType)) {
    memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
  }
1067

dengyihao's avatar
dengyihao 已提交
1068 1069 1070 1071 1072 1073 1074
  /*
    else if (CTG_IS_META_CTABLE(pOut->metaType)) {
      SName stbName = *pName;
      strcpy(stbName.tname, pOut->tbName);
      SCtgTbMetaCtx stbCtx = {0};
      stbCtx.flag = flag;
      stbCtx.pName = &stbName;
D
dapan1121 已提交
1075

dengyihao's avatar
dengyihao 已提交
1076 1077 1078 1079
      CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
      if (NULL == pOut->tbMeta) {
        ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
        CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
D
dapan1121 已提交
1080

dengyihao's avatar
dengyihao 已提交
1081 1082 1083 1084 1085 1086
        return TSDB_CODE_SUCCESS;
      }

      memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
    }
  */
D
dapan1121 已提交
1087 1088 1089 1090 1091 1092 1093 1094 1095

  TSWAP(pTask->res, pOut->tbMeta);

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }

1096
  if (pTask->res || code) {
D
dapan1121 已提交
1097 1098
    ctgHandleTaskEnd(pTask, code);
  }
1099

D
dapan1121 已提交
1100 1101 1102
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1103 1104 1105 1106 1107
int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t           code = 0;
  SCtgDBCache*      dbCache = NULL;
  SCtgTask*         pTask = tReq->pTask;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1108
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1109 1110 1111 1112 1113 1114 1115
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
  SCtgTbMetasCtx*   ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
  SCtgFetch*        pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
  SName*            pName = ctgGetFetchName(ctx->pNames, pFetch);
  int32_t           flag = pFetch->flag;
  int32_t*          vgId = &pFetch->vgId;
  bool              taskDone = false;
D
dapan1121 已提交
1116

D
dapan1121 已提交
1117
  CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
D
dapan1121 已提交
1118 1119 1120

  switch (reqType) {
    case TDMT_MND_USE_DB: {
D
dapan1121 已提交
1121
      SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
D
dapan1121 已提交
1122 1123

      SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
1124
      CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));
D
dapan1121 已提交
1125

D
dapan1121 已提交
1126
      ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
D
dapan1121 已提交
1127

D
dapan1121 已提交
1128
      *vgId = vgInfo.vgId;
D
dapan1121 已提交
1129
      CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
D
dapan1121 已提交
1130 1131 1132 1133

      return TSDB_CODE_SUCCESS;
    }
    case TDMT_MND_TABLE_META: {
D
dapan1121 已提交
1134
      STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
1135

D
dapan1121 已提交
1136
      if (CTG_IS_META_NULL(pOut->metaType)) {
D
dapan1121 已提交
1137
        if (CTG_FLAG_IS_STB(flag)) {
D
dapan1121 已提交
1138
          char dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
1139
          tNameGetFullDbName(pName, dbFName);
1140

D
dapan1121 已提交
1141 1142 1143
          CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
          if (NULL != dbCache) {
            SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
1144
            CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
1145

D
dapan1121 已提交
1146
            ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
D
dapan1121 已提交
1147

1148
            *vgId = vgInfo.vgId;
D
dapan1121 已提交
1149
            CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
D
dapan1121 已提交
1150

D
dapan1121 已提交
1151
            ctgReleaseVgInfoToCache(pCtg, dbCache);
D
dapan1121 已提交
1152 1153
          } else {
            SBuildUseDBInput input = {0};
1154

D
dapan1121 已提交
1155 1156
            tstrncpy(input.db, dbFName, tListLen(input.db));
            input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
1157

D
dapan1121 已提交
1158
            CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
D
dapan1121 已提交
1159 1160 1161 1162
          }

          return TSDB_CODE_SUCCESS;
        }
1163

D
dapan1121 已提交
1164 1165
        ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
        ctgRemoveTbMetaFromCache(pCtg, pName, false);
1166

D
dapan1121 已提交
1167 1168 1169
        CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
      }

D
dapan1121 已提交
1170 1171 1172
      if (pMsgCtx->lastOut) {
        TSWAP(pMsgCtx->out, pMsgCtx->lastOut);
        STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
D
dapan1121 已提交
1173 1174
        TSWAP(pLastOut->tbMeta, pOut->tbMeta);
      }
1175

D
dapan1121 已提交
1176 1177 1178
      break;
    }
    case TDMT_VND_TABLE_META: {
D
dapan1121 已提交
1179
      STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
1180

D
dapan1121 已提交
1181
      if (CTG_IS_META_NULL(pOut->metaType)) {
D
dapan1121 已提交
1182 1183
        ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
        ctgRemoveTbMetaFromCache(pCtg, pName, false);
D
dapan1121 已提交
1184 1185 1186
        CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
      }

D
dapan1121 已提交
1187
      if (CTG_FLAG_IS_STB(flag)) {
D
dapan1121 已提交
1188 1189
        break;
      }
1190

D
dapan1121 已提交
1191
      if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
D
dapan1121 已提交
1192
        ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
1193

D
dapan1121 已提交
1194
        taosMemoryFreeClear(pOut->tbMeta);
1195

D
dapan1121 已提交
1196
        CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
D
dapan1121 已提交
1197 1198
      } else if (CTG_IS_META_BOTH(pOut->metaType)) {
        int32_t exist = 0;
D
dapan1121 已提交
1199 1200 1201 1202 1203 1204 1205
        if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
          SName stbName = *pName;
          strcpy(stbName.tname, pOut->tbName);
          SCtgTbMetaCtx stbCtx = {0};
          stbCtx.flag = flag;
          stbCtx.pName = &stbName;

1206
          taosMemoryFreeClear(pOut->tbMeta);
D
dapan1121 已提交
1207 1208
          CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
          if (pOut->tbMeta) {
D
dapan1121 已提交
1209
            ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
D
dapan1121 已提交
1210 1211
            exist = 1;
          }
D
dapan1121 已提交
1212
        }
1213

D
dapan1121 已提交
1214
        if (0 == exist) {
D
dapan1121 已提交
1215
          TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
D
dapan1121 已提交
1216
          CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
D
dapan1121 已提交
1217 1218 1219 1220 1221 1222 1223 1224 1225 1226
        }
      }
      break;
    }
    default:
      ctgError("invalid reqType %d", reqType);
      CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
      break;
  }

D
dapan1121 已提交
1227
  STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
D
dapan1121 已提交
1228 1229 1230 1231 1232

  ctgUpdateTbMetaToCache(pCtg, pOut, false);

  if (CTG_IS_META_BOTH(pOut->metaType)) {
    memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
D
dapan1121 已提交
1233
  }
1234

dengyihao's avatar
dengyihao 已提交
1235 1236 1237 1238 1239 1240 1241
  /*
    else if (CTG_IS_META_CTABLE(pOut->metaType)) {
      SName stbName = *pName;
      strcpy(stbName.tname, pOut->tbName);
      SCtgTbMetaCtx stbCtx = {0};
      stbCtx.flag = flag;
      stbCtx.pName = &stbName;
D
dapan1121 已提交
1242

dengyihao's avatar
dengyihao 已提交
1243 1244 1245 1246
      CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
      if (NULL == pOut->tbMeta) {
        ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
        CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
D
dapan1121 已提交
1247

dengyihao's avatar
dengyihao 已提交
1248 1249 1250 1251 1252 1253
        return TSDB_CODE_SUCCESS;
      }

      memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
    }
  */
D
dapan1121 已提交
1254

1255
  SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
D
dapan1121 已提交
1256 1257 1258 1259
  pRes->code = 0;
  pRes->pRes = pOut->tbMeta;
  pOut->tbMeta = NULL;
  if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
1260
    TSWAP(pTask->res, ctx->pResList);
1261
    taskDone = true;
D
dapan1121 已提交
1262
  }
D
dapan1121 已提交
1263 1264 1265 1266

_return:

  if (dbCache) {
D
dapan1121 已提交
1267
    ctgReleaseVgInfoToCache(pCtg, dbCache);
D
dapan1121 已提交
1268 1269
  }

D
dapan1121 已提交
1270
  if (code) {
1271
    SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
D
dapan1121 已提交
1272 1273 1274
    pRes->code = code;
    pRes->pRes = NULL;
    if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
1275
      TSWAP(pTask->res, ctx->pResList);
1276
      taskDone = true;
D
dapan1121 已提交
1277 1278 1279
    }
  }

1280
  if (pTask->res && taskDone) {
D
dapan1121 已提交
1281 1282
    ctgHandleTaskEnd(pTask, code);
  }
1283

D
dapan1121 已提交
1284 1285 1286
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1287 1288 1289
int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t      code = 0;
  SCtgTask*    pTask = tReq->pTask;
D
dapan1121 已提交
1290
  SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1291
  SCatalog*    pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1292 1293

  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
D
dapan1121 已提交
1294 1295 1296 1297

  switch (reqType) {
    case TDMT_MND_USE_DB: {
      SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
dengyihao's avatar
dengyihao 已提交
1298
      SDBVgInfo*    pDb = NULL;
1299

D
dapan1121 已提交
1300 1301 1302 1303
      CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res));

      CTG_ERR_JRET(cloneDbVgInfo(pOut->dbVgroup, &pDb));
      CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pDb, false));
D
dapan1121 已提交
1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319

      break;
    }
    default:
      ctgError("invalid reqType %d", reqType);
      CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
      break;
  }

_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1320 1321 1322
int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t        code = 0;
  SCtgTask*      pTask = tReq->pTask;
D
dapan1121 已提交
1323
  SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1324
  SCatalog*      pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1325 1326

  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
D
dapan1121 已提交
1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337

  switch (reqType) {
    case TDMT_MND_USE_DB: {
      SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;

      pTask->res = taosMemoryMalloc(sizeof(SVgroupInfo));
      if (NULL == pTask->res) {
        CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
      }

      CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, ctx->pName, (SVgroupInfo*)pTask->res));
1338

D
dapan1121 已提交
1339
      CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pOut->dbVgroup, false));
D
dapan1121 已提交
1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356
      pOut->dbVgroup = NULL;

      break;
    }
    default:
      ctgError("invalid reqType %d", reqType);
      CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
      break;
  }

_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1357 1358 1359
int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t         code = 0;
  SCtgTask*       pTask = tReq->pTask;
1360
  SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1361 1362 1363 1364
  SCatalog*       pCtg = pTask->pJob->pCtg;
  SCtgMsgCtx*     pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
  SCtgFetch*      pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
  bool            taskDone = false;
1365 1366 1367 1368 1369 1370 1371

  CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));

  switch (reqType) {
    case TDMT_MND_USE_DB: {
      SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;

1372
      STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
D
dapan1121 已提交
1373
      CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, tReq, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true));
1374

1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387
      CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false));
      pOut->dbVgroup = NULL;

      break;
    }
    default:
      ctgError("invalid reqType %d", reqType);
      CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
      break;
  }

  if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
    TSWAP(pTask->res, ctx->pResList);
1388
    taskDone = true;
1389 1390 1391 1392 1393
  }

_return:

  if (code) {
1394
    STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
dengyihao's avatar
dengyihao 已提交
1395
    int32_t     num = taosArrayGetSize(pReq->pTables);
1396
    for (int32_t i = 0; i < num; ++i) {
dengyihao's avatar
dengyihao 已提交
1397
      SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx + i);
1398 1399 1400
      pRes->code = code;
      pRes->pRes = NULL;
    }
1401

1402 1403
    if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
      TSWAP(pTask->res, ctx->pResList);
1404
      taskDone = true;
1405 1406 1407
    }
  }

1408
  if (pTask->res && taskDone) {
1409 1410 1411 1412 1413 1414
    ctgHandleTaskEnd(pTask, code);
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1415 1416
int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1417
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1418
  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
D
dapan1121 已提交
1419

D
dapan1121 已提交
1420
  STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
dengyihao's avatar
dengyihao 已提交
1421
  SArray*      pInfo = NULL;
D
dapan1121 已提交
1422 1423 1424
  CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
  pTask->res = pInfo;

1425
  SCtgTbIndexCtx* ctx = pTask->taskCtx;
D
dapan1121 已提交
1426
  CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
1427

D
dapan1121 已提交
1428
_return:
D
dapan1121 已提交
1429

X
Xiaoyu Wang 已提交
1430 1431 1432
  if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
    code = TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
1433 1434 1435 1436 1437
  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1438 1439
int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1440
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1441 1442 1443
  CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1444

D
dapan1121 已提交
1445 1446 1447 1448 1449 1450
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}
D
dapan1121 已提交
1451

dengyihao's avatar
dengyihao 已提交
1452 1453
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1454
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1455 1456 1457
  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1458

D
dapan1121 已提交
1459 1460 1461 1462 1463 1464 1465
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1466
int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
D
dapan1121 已提交
1467 1468 1469
  CTG_RET(TSDB_CODE_APP_ERROR);
}

dengyihao's avatar
dengyihao 已提交
1470 1471
int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1472
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1473 1474 1475
  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1476

D
dapan1121 已提交
1477 1478 1479 1480 1481 1482 1483
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1484 1485
int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1486
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1487 1488 1489
  CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1490

D
dapan1121 已提交
1491 1492 1493 1494 1495 1496 1497
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1498 1499
int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1500
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1501 1502 1503
  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1504

D
dapan1121 已提交
1505 1506 1507 1508 1509 1510 1511
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1512 1513
int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1514
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1515 1516 1517
  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1518

D
dapan1121 已提交
1519 1520 1521 1522 1523 1524 1525
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1526 1527 1528 1529 1530 1531
int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t          code = 0;
  SCtgTask*        pTask = tReq->pTask;
  SCtgUserCtx*     ctx = (SCtgUserCtx*)pTask->taskCtx;
  SCatalog*        pCtg = pTask->pJob->pCtg;
  bool             pass = false;
D
dapan1121 已提交
1532
  SGetUserAuthRsp* pOut = (SGetUserAuthRsp*)pTask->msgCtx.out;
D
dapan1121 已提交
1533 1534 1535

  CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

D
dapan1121 已提交
1536 1537 1538 1539 1540 1541 1542 1543 1544 1545
  if (pOut->superAuth) {
    pass = true;
    goto _return;
  }

  if (pOut->createdDbs && taosHashGet(pOut->createdDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
    pass = true;
    goto _return;
  }

dengyihao's avatar
dengyihao 已提交
1546 1547
  if (ctx->user.type == AUTH_TYPE_READ && pOut->readDbs &&
      taosHashGet(pOut->readDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
D
dapan1121 已提交
1548
    pass = true;
dengyihao's avatar
dengyihao 已提交
1549 1550
  } else if (ctx->user.type == AUTH_TYPE_WRITE && pOut->writeDbs &&
             taosHashGet(pOut->writeDbs, ctx->user.dbFName, strlen(ctx->user.dbFName))) {
D
dapan1121 已提交
1551 1552 1553 1554 1555 1556 1557 1558 1559 1560 1561 1562 1563 1564
    pass = true;
  }

_return:

  if (TSDB_CODE_SUCCESS == code) {
    pTask->res = taosMemoryCalloc(1, sizeof(bool));
    if (NULL == pTask->res) {
      code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      *(bool*)pTask->res = pass;
    }
  }

D
dapan1121 已提交
1565
  ctgUpdateUserEnqueue(pCtg, pOut, false);
D
dapan1121 已提交
1566
  taosMemoryFreeClear(pTask->msgCtx.out);
D
dapan1121 已提交
1567 1568 1569 1570 1571 1572

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1573 1574
int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
D
dapan1121 已提交
1575
  SCtgTask* pTask = tReq->pTask;
D
dapan1121 已提交
1576 1577 1578 1579

  CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));

  TSWAP(pTask->res, pTask->msgCtx.out);
1580

D
dapan1121 已提交
1581 1582 1583 1584 1585 1586 1587
_return:

  ctgHandleTaskEnd(pTask, code);

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1588 1589 1590
int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq* tReq, int32_t flag, SName* pName, int32_t* vgId) {
  SCtgTask*         pTask = tReq->pTask;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1591
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1592
  int32_t           code = 0;
D
dapan1121 已提交
1593

D
dapan1121 已提交
1594 1595
  if (CTG_FLAG_IS_SYS_DB(flag)) {
    ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName));
D
dapan1121 已提交
1596

dengyihao's avatar
dengyihao 已提交
1597
    CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char*)pName->dbname, (char*)pName->tname, NULL, tReq));
D
dapan1121 已提交
1598 1599
  }

D
dapan1121 已提交
1600 1601
  if (CTG_FLAG_IS_STB(flag)) {
    ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pName));
D
dapan1121 已提交
1602 1603

    // if get from mnode failed, will not try vnode
D
dapan1121 已提交
1604
    CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
D
dapan1121 已提交
1605 1606
  }

dengyihao's avatar
dengyihao 已提交
1607 1608
  SCtgDBCache* dbCache = NULL;
  char         dbFName[TSDB_DB_FNAME_LEN] = {0};
D
dapan1121 已提交
1609
  tNameGetFullDbName(pName, dbFName);
1610

D
dapan1121 已提交
1611
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
D
dapan1121 已提交
1612
  if (dbCache) {
D
dapan1121 已提交
1613
    SVgroupInfo vgInfo = {0};
D
dapan1121 已提交
1614
    CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
D
dapan1121 已提交
1615

D
dapan1121 已提交
1616
    ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
D
dapan1121 已提交
1617

D
dapan1121 已提交
1618
    *vgId = vgInfo.vgId;
D
dapan1121 已提交
1619
    CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
D
dapan1121 已提交
1620 1621 1622 1623 1624 1625
  } else {
    SBuildUseDBInput input = {0};

    tstrncpy(input.db, dbFName, tListLen(input.db));
    input.vgVersion = CTG_DEFAULT_INVALID_VERSION;

D
dapan1121 已提交
1626
    CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
D
dapan1121 已提交
1627 1628 1629 1630 1631
  }

_return:

  if (dbCache) {
D
dapan1121 已提交
1632
    ctgReleaseVgInfoToCache(pCtg, dbCache);
D
dapan1121 已提交
1633 1634 1635 1636 1637
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1638 1639
int32_t ctgLaunchGetTbMetaTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1640
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1641 1642
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1643 1644 1645
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1646

D
dapan1121 已提交
1647
  CTG_ERR_RET(ctgGetTbMetaFromCache(pCtg, pConn, (SCtgTbMetaCtx*)pTask->taskCtx, (STableMeta**)&pTask->res));
D
dapan1121 已提交
1648 1649 1650 1651 1652
  if (pTask->res) {
    CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
1653
  SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1654
  SCtgTaskReq    tReq;
D
dapan1121 已提交
1655
  tReq.pTask = pTask;
1656
  tReq.msgIdx = -1;
D
dapan1121 已提交
1657
  CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId));
D
dapan1121 已提交
1658 1659 1660 1661

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1662 1663
int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1664
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1665 1666
  SCtgTbMetasCtx*   pCtx = (SCtgTbMetasCtx*)pTask->taskCtx;
  SCtgJob*          pJob = pTask->pJob;
D
dapan1121 已提交
1667

1668 1669 1670 1671 1672 1673 1674 1675 1676
  int32_t dbNum = taosArrayGetSize(pCtx->pNames);
  int32_t fetchIdx = 0;
  int32_t baseResIdx = 0;
  for (int32_t i = 0; i < dbNum; ++i) {
    STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
    ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables));
    CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
    baseResIdx += taosArrayGetSize(pReq->pTables);
  }
1677

1678 1679 1680
  pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
  if (pCtx->fetchNum <= 0) {
    TSWAP(pTask->res, pCtx->pResList);
1681

D
dapan1121 已提交
1682 1683 1684
    CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
    return TSDB_CODE_SUCCESS;
  }
1685

D
dapan1121 已提交
1686
  pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
1687
  taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
1688

D
dapan1121 已提交
1689
  for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
dengyihao's avatar
dengyihao 已提交
1690 1691 1692
    SCtgFetch*  pFetch = taosArrayGet(pCtx->pFetchs, i);
    SName*      pName = ctgGetFetchName(pCtx->pNames, pFetch);
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
D
dapan1121 已提交
1693 1694 1695
    if (NULL == pMsgCtx->pBatchs) {
      pMsgCtx->pBatchs = pJob->pBatchs;
    }
1696

D
dapan1121 已提交
1697 1698
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
1699
    tReq.msgIdx = pFetch->fetchIdx;
D
dapan1121 已提交
1700
    CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
D
dapan1121 已提交
1701
  }
1702

D
dapan1121 已提交
1703 1704 1705
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1706 1707 1708
int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) {
  int32_t           code = 0;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1709
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1710 1711 1712 1713
  SCtgDBCache*      dbCache = NULL;
  SCtgDbVgCtx*      pCtx = (SCtgDbVgCtx*)pTask->taskCtx;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1714 1715 1716
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
1717

D
dapan1121 已提交
1718 1719
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
  if (NULL != dbCache) {
D
dapan1121 已提交
1720
    CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
D
dapan1121 已提交
1721 1722 1723

    ctgReleaseVgInfoToCache(pCtg, dbCache);
    dbCache = NULL;
1724

D
dapan1121 已提交
1725 1726 1727
    CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
  } else {
    SBuildUseDBInput input = {0};
1728

D
dapan1121 已提交
1729 1730
    tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
    input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1731 1732 1733 1734 1735

    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;
    CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
D
dapan1121 已提交
1736 1737 1738 1739 1740
  }

_return:

  if (dbCache) {
D
dapan1121 已提交
1741
    ctgReleaseVgInfoToCache(pCtg, dbCache);
D
dapan1121 已提交
1742 1743 1744 1745 1746
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1747 1748 1749
int32_t ctgLaunchGetTbHashTask(SCtgTask* pTask) {
  int32_t           code = 0;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1750
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1751 1752 1753 1754
  SCtgDBCache*      dbCache = NULL;
  SCtgTbHashCtx*    pCtx = (SCtgTbHashCtx*)pTask->taskCtx;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1755 1756 1757
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
1758

D
dapan1121 已提交
1759 1760 1761 1762 1763 1764
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
  if (NULL != dbCache) {
    pTask->res = taosMemoryMalloc(sizeof(SVgroupInfo));
    if (NULL == pTask->res) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1765
    CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
D
dapan1121 已提交
1766 1767 1768

    ctgReleaseVgInfoToCache(pCtg, dbCache);
    dbCache = NULL;
1769

D
dapan1121 已提交
1770 1771 1772
    CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
  } else {
    SBuildUseDBInput input = {0};
1773

D
dapan1121 已提交
1774 1775
    tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
    input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
D
dapan1121 已提交
1776 1777 1778

    SCtgTaskReq tReq;
    tReq.pTask = pTask;
1779
    tReq.msgIdx = -1;
D
dapan1121 已提交
1780
    CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
D
dapan1121 已提交
1781 1782 1783 1784 1785
  }

_return:

  if (dbCache) {
D
dapan1121 已提交
1786
    ctgReleaseVgInfoToCache(pCtg, dbCache);
D
dapan1121 已提交
1787 1788 1789 1790 1791
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1792 1793
int32_t ctgLaunchGetTbHashsTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
1794
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1795 1796 1797 1798 1799 1800 1801
  SCtgTbHashsCtx*   pCtx = (SCtgTbHashsCtx*)pTask->taskCtx;
  SCtgDBCache*      dbCache = NULL;
  SCtgJob*          pJob = pTask->pJob;
  int32_t           dbNum = taosArrayGetSize(pCtx->pNames);
  int32_t           fetchIdx = 0;
  int32_t           baseResIdx = 0;
  int32_t           code = 0;
1802

1803 1804
  for (int32_t i = 0; i < dbNum; ++i) {
    STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
1805

1806
    CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pReq->dbFName, &dbCache));
1807 1808

    if (NULL != dbCache) {
D
dapan1121 已提交
1809 1810
      SCtgTaskReq tReq;
      tReq.pTask = pTask;
1811
      tReq.msgIdx = -1;
dengyihao's avatar
dengyihao 已提交
1812 1813
      CTG_ERR_JRET(
          ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
1814 1815 1816

      ctgReleaseVgInfoToCache(pCtg, dbCache);
      dbCache = NULL;
1817 1818

      baseResIdx += taosArrayGetSize(pReq->pTables);
1819
    } else {
1820
      ctgAddFetch(&pCtx->pFetchs, i, -1, &fetchIdx, baseResIdx, 0);
1821

1822
      baseResIdx += taosArrayGetSize(pReq->pTables);
1823
      taosArraySetSize(pCtx->pResList, baseResIdx);
1824 1825 1826 1827 1828 1829
    }
  }

  pCtx->fetchNum = taosArrayGetSize(pCtx->pFetchs);
  if (pCtx->fetchNum <= 0) {
    TSWAP(pTask->res, pCtx->pResList);
1830

1831 1832 1833
    CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
    return TSDB_CODE_SUCCESS;
  }
1834

1835 1836
  pTask->msgCtxs = taosArrayInit(pCtx->fetchNum, sizeof(SCtgMsgCtx));
  taosArraySetSize(pTask->msgCtxs, pCtx->fetchNum);
1837

1838
  for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
dengyihao's avatar
dengyihao 已提交
1839
    SCtgFetch*  pFetch = taosArrayGet(pCtx->pFetchs, i);
D
dapan1121 已提交
1840
    STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
dengyihao's avatar
dengyihao 已提交
1841
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
D
dapan1121 已提交
1842 1843 1844
    if (NULL == pMsgCtx->pBatchs) {
      pMsgCtx->pBatchs = pJob->pBatchs;
    }
1845

1846
    SBuildUseDBInput input = {0};
D
dapan1121 已提交
1847
    strcpy(input.db, pReq->dbFName);
1848

1849 1850
    input.vgVersion = CTG_DEFAULT_INVALID_VERSION;

D
dapan1121 已提交
1851 1852 1853 1854
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = pFetch->fetchIdx;
    CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
1855 1856 1857 1858 1859 1860 1861
  }

_return:

  if (dbCache) {
    ctgReleaseVgInfoToCache(pCtg, dbCache);
  }
1862

1863 1864 1865
  return code;
}

dengyihao's avatar
dengyihao 已提交
1866 1867 1868
int32_t ctgLaunchGetTbIndexTask(SCtgTask* pTask) {
  int32_t           code = 0;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1869
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1870 1871 1872 1873
  SCtgTbIndexCtx*   pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
  SArray*           pRes = NULL;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1874 1875 1876
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1877 1878 1879 1880

  CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
  if (pRes) {
    pTask->res = pRes;
1881

D
dapan1121 已提交
1882 1883 1884
    CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
    return TSDB_CODE_SUCCESS;
  }
1885

D
dapan1121 已提交
1886
  CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
D
dapan1121 已提交
1887 1888 1889
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1890 1891 1892
int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
  int32_t           code = 0;
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1893
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1894 1895 1896
  SCtgTbCfgCtx*     pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
  SArray*           pRes = NULL;
  char              dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1897
  tNameGetFullDbName(pCtx->pName, dbFName);
dengyihao's avatar
dengyihao 已提交
1898 1899
  SCtgJob*    pJob = pTask->pJob;
  SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1900 1901 1902
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 1913 1914 1915 1916 1917 1918 1919 1920 1921

  if (pCtx->tbType <= 0) {
    CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
    if (pCtx->tbType <= 0) {
      CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, pCtx->pName));
      return TSDB_CODE_SUCCESS;
    }
  }

  if (TSDB_SUPER_TABLE == pCtx->tbType) {
    CTG_ERR_JRET(ctgGetTableCfgFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
  } else {
    if (NULL == pCtx->pVgInfo) {
      CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo));
      if (NULL == pCtx->pVgInfo) {
        CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbCfgCb, dbFName));
        return TSDB_CODE_SUCCESS;
      }
    }
1922

D
dapan1121 已提交
1923 1924 1925 1926 1927 1928 1929 1930 1931 1932
    CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask));
  }

  return TSDB_CODE_SUCCESS;

_return:

  if (CTG_TASK_LAUNCHED == pTask->status) {
    ctgHandleTaskEnd(pTask, code);
  }
1933

D
dapan1121 已提交
1934 1935 1936
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
1937 1938
int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1939
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1940 1941
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1942 1943 1944
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1945

D
dapan1121 已提交
1946
  CTG_ERR_RET(ctgGetQnodeListFromMnode(pCtg, pConn, NULL, pTask));
D
dapan1121 已提交
1947 1948 1949
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1950 1951
int32_t ctgLaunchGetDnodeTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1952
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1953 1954
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1955 1956 1957
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1958 1959 1960 1961 1962

  CTG_ERR_RET(ctgGetDnodeListFromMnode(pCtg, pConn, NULL, pTask));
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1963 1964
int32_t ctgLaunchGetDbCfgTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
1965
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
1966 1967 1968
  SCtgDbCfgCtx*     pCtx = (SCtgDbCfgCtx*)pTask->taskCtx;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1969 1970 1971
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1972

D
dapan1121 已提交
1973
  CTG_ERR_RET(ctgGetDBCfgFromMnode(pCtg, pConn, pCtx->dbFName, NULL, pTask));
D
dapan1121 已提交
1974 1975 1976 1977

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1978 1979 1980 1981
int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
  int32_t        code = 0;
  SCatalog*      pCtg = pTask->pJob->pCtg;
  SCtgDBCache*   dbCache = NULL;
D
dapan1121 已提交
1982
  SCtgDbInfoCtx* pCtx = (SCtgDbInfoCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1983 1984
  SCtgJob*       pJob = pTask->pJob;
  SCtgMsgCtx*    pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
1985 1986 1987
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
1988 1989 1990 1991 1992 1993 1994 1995 1996

  pTask->res = taosMemoryCalloc(1, sizeof(SDbInfo));
  if (NULL == pTask->res) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

  SDbInfo* pInfo = (SDbInfo*)pTask->res;
  CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
  if (NULL != dbCache) {
D
dapan1121 已提交
1997
    pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
D
dapan1121 已提交
1998
    pInfo->dbId = dbCache->dbId;
D
dapan1121 已提交
1999
    pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
D
dapan1121 已提交
2000 2001 2002

    ctgReleaseVgInfoToCache(pCtg, dbCache);
    dbCache = NULL;
D
dapan1121 已提交
2003 2004 2005 2006 2007 2008 2009 2010 2011
  } else {
    pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
  }

  CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));

_return:

  if (dbCache) {
D
dapan1121 已提交
2012
    ctgReleaseVgInfoToCache(pCtg, dbCache);
D
dapan1121 已提交
2013 2014 2015 2016 2017
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
2018 2019
int32_t ctgLaunchGetIndexTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
2020
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
2021 2022 2023
  SCtgIndexCtx*     pCtx = (SCtgIndexCtx*)pTask->taskCtx;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
2024 2025 2026
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
2027

D
dapan1121 已提交
2028
  CTG_ERR_RET(ctgGetIndexInfoFromMnode(pCtg, pConn, pCtx->indexFName, NULL, pTask));
D
dapan1121 已提交
2029 2030 2031 2032

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2033 2034
int32_t ctgLaunchGetUdfTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
2035
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
2036 2037 2038
  SCtgUdfCtx*       pCtx = (SCtgUdfCtx*)pTask->taskCtx;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
2039 2040 2041
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
2042

D
dapan1121 已提交
2043
  CTG_ERR_RET(ctgGetUdfInfoFromMnode(pCtg, pConn, pCtx->udfName, NULL, pTask));
D
dapan1121 已提交
2044 2045 2046 2047

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2048 2049
int32_t ctgLaunchGetUserTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
2050
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
2051 2052 2053 2054 2055
  SCtgUserCtx*      pCtx = (SCtgUserCtx*)pTask->taskCtx;
  bool              inCache = false;
  bool              pass = false;
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
2056 2057 2058
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
2059

D
dapan1121 已提交
2060 2061 2062 2063 2064 2065 2066
  CTG_ERR_RET(ctgChkAuthFromCache(pCtg, pCtx->user.user, pCtx->user.dbFName, pCtx->user.type, &inCache, &pass));
  if (inCache) {
    pTask->res = taosMemoryCalloc(1, sizeof(bool));
    if (NULL == pTask->res) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    *(bool*)pTask->res = pass;
2067

D
dapan1121 已提交
2068 2069 2070 2071
    CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
2072
  CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask));
D
dapan1121 已提交
2073 2074 2075 2076

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2077 2078
int32_t ctgLaunchGetSvrVerTask(SCtgTask* pTask) {
  SCatalog*         pCtg = pTask->pJob->pCtg;
D
dapan1121 已提交
2079
  SRequestConnInfo* pConn = &pTask->pJob->conn;
dengyihao's avatar
dengyihao 已提交
2080 2081
  SCtgJob*          pJob = pTask->pJob;
  SCtgMsgCtx*       pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
2082 2083 2084
  if (NULL == pMsgCtx->pBatchs) {
    pMsgCtx->pBatchs = pJob->pBatchs;
  }
D
dapan1121 已提交
2085 2086 2087 2088 2089 2090

  CTG_ERR_RET(ctgGetSvrVerFromMnode(pCtg, pConn, NULL, pTask));

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2091
int32_t ctgRelaunchGetTbMetaTask(SCtgTask* pTask) {
D
dapan1121 已提交
2092 2093 2094 2095 2096 2097 2098
  ctgResetTbMetaTask(pTask);

  CTG_ERR_RET(ctgLaunchGetTbMetaTask(pTask));

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2099
int32_t ctgGetTbCfgCb(SCtgTask* pTask) {
D
dapan1121 已提交
2100
  int32_t code = 0;
2101

D
dapan1121 已提交
2102 2103 2104 2105 2106 2107 2108
  CTG_ERR_JRET(pTask->subRes.code);

  SCtgTbCfgCtx* pCtx = (SCtgTbCfgCtx*)pTask->taskCtx;
  if (CTG_TASK_GET_TB_META == pTask->subRes.type) {
    pCtx->tbType = ((STableMeta*)pTask->subRes.res)->tableType;
  } else if (CTG_TASK_GET_DB_VGROUP == pTask->subRes.type) {
    SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res;
2109

D
dapan1121 已提交
2110 2111 2112 2113 2114 2115 2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128 2129 2130 2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142 2143 2144 2145 2146 2147 2148
    pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
    CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo));
  }

  CTG_RET(ctgLaunchGetTbCfgTask(pTask));

_return:

  CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
}

int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) {
  SCtgDbVgCtx* ctx = pTask->taskCtx;

  *equal = (0 == strcmp(ctx->dbFName, param));

  return TSDB_CODE_SUCCESS;
}

int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) {
  SCtgTbMetaCtx* ctx = pTask->taskCtx;

  *equal = tNameTbNameEqual(ctx->pName, (SName*)param);

  return TSDB_CODE_SUCCESS;
}

int32_t ctgCloneTbMeta(SCtgTask* pTask, void** pRes) {
  STableMeta* pMeta = (STableMeta*)pTask->res;

  CTG_RET(cloneTableMeta(pMeta, (STableMeta**)pRes));
}

int32_t ctgCloneDbVg(SCtgTask* pTask, void** pRes) {
  SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;

  CTG_RET(cloneDbVgInfo(pOut->dbVgroup, (SDBVgInfo**)pRes));
}

D
dapan1121 已提交
2149
SCtgAsyncFps gCtgAsyncFps[] = {
dengyihao's avatar
dengyihao 已提交
2150 2151 2152 2153 2154 2155 2156 2157 2158 2159 2160 2161 2162 2163 2164 2165
    {ctgInitGetQnodeTask, ctgLaunchGetQnodeTask, ctgHandleGetQnodeRsp, ctgDumpQnodeRes, NULL, NULL},
    {ctgInitGetDnodeTask, ctgLaunchGetDnodeTask, ctgHandleGetDnodeRsp, ctgDumpDnodeRes, NULL, NULL},
    {ctgInitGetDbVgTask, ctgLaunchGetDbVgTask, ctgHandleGetDbVgRsp, ctgDumpDbVgRes, ctgCompDbVgTasks, ctgCloneDbVg},
    {ctgInitGetDbCfgTask, ctgLaunchGetDbCfgTask, ctgHandleGetDbCfgRsp, ctgDumpDbCfgRes, NULL, NULL},
    {ctgInitGetDbInfoTask, ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes, NULL, NULL},
    {ctgInitGetTbMetaTask, ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes, ctgCompTbMetaTasks,
     ctgCloneTbMeta},
    {ctgInitGetTbHashTask, ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes, NULL, NULL},
    {ctgInitGetTbIndexTask, ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes, NULL, NULL},
    {ctgInitGetTbCfgTask, ctgLaunchGetTbCfgTask, ctgHandleGetTbCfgRsp, ctgDumpTbCfgRes, NULL, NULL},
    {ctgInitGetIndexTask, ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes, NULL, NULL},
    {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
    {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
    {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
    {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
    {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
D
dapan1121 已提交
2166 2167
};

dengyihao's avatar
dengyihao 已提交
2168
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
D
dapan1121 已提交
2169 2170
  int32_t code = 0;
  int32_t taskNum = taosArrayGetSize(pJob->pTasks);
2171

D
dapan1121 已提交
2172
  for (int32_t i = 0; i < taskNum; ++i) {
dengyihao's avatar
dengyihao 已提交
2173
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
D
dapan1121 已提交
2174 2175 2176 2177 2178 2179
    CTG_ERR_RET((*gCtgAsyncFps[pTask->type].dumpResFp)(pTask));
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
2180
int32_t ctgSearchExistingTask(SCtgJob* pJob, CTG_TASK_TYPE type, void* param, int32_t* taskId) {
D
dapan1121 已提交
2181 2182 2183
  bool      equal = false;
  SCtgTask* pTask = NULL;
  int32_t   code = 0;
2184

D
dapan1121 已提交
2185
  CTG_LOCK(CTG_READ, &pJob->taskLock);
2186

D
dapan1121 已提交
2187 2188 2189
  int32_t taskNum = taosArrayGetSize(pJob->pTasks);
  for (int32_t i = 0; i < taskNum; ++i) {
    pTask = taosArrayGet(pJob->pTasks, i);
dengyihao's avatar
dengyihao 已提交
2190
    if (type != pTask->type) {
D
dapan1121 已提交
2191 2192
      continue;
    }
2193

D
dapan1121 已提交
2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204 2205 2206 2207 2208 2209
    CTG_ERR_JRET((*gCtgAsyncFps[type].compFp)(pTask, param, &equal));
    if (equal) {
      break;
    }
  }

_return:

  CTG_UNLOCK(CTG_READ, &pJob->taskLock);
  if (equal) {
    *taskId = pTask->taskId;
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
2210
int32_t ctgSetSubTaskCb(SCtgTask* pSub, SCtgTask* pTask) {
D
dapan1121 已提交
2211
  int32_t code = 0;
2212

D
dapan1121 已提交
2213 2214 2215 2216
  CTG_LOCK(CTG_WRITE, &pSub->lock);
  if (CTG_TASK_DONE == pSub->status) {
    pTask->subRes.code = pSub->code;
    CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
dengyihao's avatar
dengyihao 已提交
2217 2218
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
    SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
D
dapan1121 已提交
2219
    pMsgCtx->pBatchs = pSubMsgCtx->pBatchs;
2220

D
dapan1121 已提交
2221 2222 2223 2224 2225 2226 2227 2228 2229 2230 2231 2232 2233
    CTG_ERR_JRET(pTask->subRes.fp(pTask));
  } else {
    if (NULL == pSub->pParents) {
      pSub->pParents = taosArrayInit(4, POINTER_BYTES);
    }

    taosArrayPush(pSub->pParents, &pTask);
  }

_return:

  CTG_UNLOCK(CTG_WRITE, &pSub->lock);

2234
  CTG_RET(code);
D
dapan1121 已提交
2235 2236
}

dengyihao's avatar
dengyihao 已提交
2237
int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp, void* param) {
D
dapan1121 已提交
2238 2239 2240 2241 2242 2243 2244
  SCtgJob* pJob = pTask->pJob;
  int32_t  subTaskId = -1;
  bool     newTask = false;

  ctgClearSubTaskRes(&pTask->subRes);
  pTask->subRes.type = type;
  pTask->subRes.fp = fp;
2245

D
dapan1121 已提交
2246 2247 2248 2249 2250
  CTG_ERR_RET(ctgSearchExistingTask(pJob, type, param, &subTaskId));
  if (subTaskId < 0) {
    CTG_ERR_RET(ctgInitTask(pJob, type, param, &subTaskId));
    newTask = true;
  }
2251

D
dapan1121 已提交
2252 2253 2254 2255 2256
  SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId);

  CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));

  if (newTask) {
dengyihao's avatar
dengyihao 已提交
2257 2258
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
    SCtgMsgCtx* pSubMsgCtx = CTG_GET_TASK_MSGCTX(pSub, -1);
D
dapan1121 已提交
2259 2260
    pSubMsgCtx->pBatchs = pMsgCtx->pBatchs;

D
dapan1121 已提交
2261 2262 2263 2264 2265 2266
    CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
    pSub->status = CTG_TASK_LAUNCHED;
  }

  return TSDB_CODE_SUCCESS;
}
D
dapan1121 已提交
2267

dengyihao's avatar
dengyihao 已提交
2268
int32_t ctgLaunchJob(SCtgJob* pJob) {
D
dapan1121 已提交
2269
  int32_t taskNum = taosArrayGetSize(pJob->pTasks);
2270

D
dapan1121 已提交
2271
  for (int32_t i = 0; i < taskNum; ++i) {
dengyihao's avatar
dengyihao 已提交
2272
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, i);
D
dapan1121 已提交
2273

D
dapan1121 已提交
2274
    qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
D
dapan1121 已提交
2275
    CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
2276

D
dapan1121 已提交
2277
    pTask->status = CTG_TASK_LAUNCHED;
D
dapan1121 已提交
2278 2279
  }

D
dapan1121 已提交
2280 2281
  if (taskNum <= 0) {
    qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
2282

D
dapan1121 已提交
2283
    taosAsyncExec(ctgCallUserCb, pJob, NULL);
2284
#if CTG_BATCH_FETCH
D
dapan1121 已提交
2285 2286 2287
  } else {
    ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs);
#endif
D
dapan1121 已提交
2288 2289
  }

D
dapan1121 已提交
2290 2291
  return TSDB_CODE_SUCCESS;
}