ctgRemote.c 43.7 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tname.h"
D
dapan1121 已提交
20
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
21
#include "trpc.h"
D
dapan1121 已提交
22
#include "ctgRemote.h"
D
dapan1121 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
26
  SCatalog* pCtg = pJob->pCtg;
D
dapan1121 已提交
27
  int32_t   taskNum = taosArrayGetSize(cbParam->taskId);
dengyihao's avatar
dengyihao 已提交
28
  SDataBuf  taskMsg = *pMsg;
D
dapan1121 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42
  int32_t msgNum = 0;
  SBatchRsp batchRsp = {0};
  SBatchRspMsg rsp = {0};
  SBatchRspMsg *pRsp = NULL;

  if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) {
    if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) {
      ctgError("tDeserializeSBatchRsp failed, msgLen:%d", pMsg->len);
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }

    msgNum = taosArrayGetSize(batchRsp.pRsps);
  }
  
D
dapan1121 已提交
43 44 45
  if (ASSERTS(taskNum == msgNum || 0 == msgNum, "taskNum %d mis-match msgNum %d", taskNum, msgNum)) {
    msgNum = 0;
  }
D
dapan1121 已提交
46

dengyihao's avatar
dengyihao 已提交
47 48
  ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
           TMSG_INFO(cbParam->reqType + 1));
D
dapan1121 已提交
49 50 51 52

  SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pBatchs) {
    ctgError("taosHashInit %d batch failed", taskNum);
D
dapan1121 已提交
53
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
54
  }
dengyihao's avatar
dengyihao 已提交
55

D
dapan1121 已提交
56
  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
57 58
    int32_t*  taskId = taosArrayGet(cbParam->taskId, i);
    int32_t*  msgIdx = taosArrayGet(cbParam->msgIdx, i);
dengyihao's avatar
dengyihao 已提交
59
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
60
    if (msgNum > 0) {
D
dapan1121 已提交
61 62
      pRsp = taosArrayGet(batchRsp.pRsps, i);

D
dapan1121 已提交
63 64 65 66 67 68 69 70 71 72 73 74 75
      if (ASSERTS(pRsp->msgIdx == *msgIdx, "rsp msgIdx %d mis-match msgIdx %d", pRsp->msgIdx, *msgIdx)) {
        pRsp = &rsp;
        pRsp->msgIdx = *msgIdx;
        pRsp->reqType = -1;
        pRsp->rspCode = 0;
        taskMsg.msgType = -1;
        taskMsg.pData = NULL;
        taskMsg.len = 0;
      } else {
        taskMsg.msgType = pRsp->reqType;
        taskMsg.pData = pRsp->msg;
        taskMsg.len = pRsp->msgLen;
      }
D
dapan1121 已提交
76
    } else {
D
dapan1121 已提交
77 78 79 80
      pRsp = &rsp;
      pRsp->msgIdx = *msgIdx;
      pRsp->reqType = -1;
      pRsp->rspCode = 0;
D
dapan1121 已提交
81 82 83 84
      taskMsg.msgType = -1;
      taskMsg.pData = NULL;
      taskMsg.len = 0;
    }
dengyihao's avatar
dengyihao 已提交
85

D
dapan1121 已提交
86 87
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
D
dapan1121 已提交
88
    tReq.msgIdx = pRsp->msgIdx;
D
dapan1121 已提交
89 90
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
    pMsgCtx->pBatchs = pBatchs;
D
dapan1121 已提交
91

dengyihao's avatar
dengyihao 已提交
92
    ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
D
dapan1121 已提交
93
             pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
D
dapan1121 已提交
94

D
dapan1121 已提交
95
    (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode));
D
dapan1121 已提交
96 97 98 99 100 101
  }

  CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));

_return:

D
dapan1121 已提交
102 103
  taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);

D
dapan1121 已提交
104 105 106 107
  ctgFreeBatchs(pBatchs);
  CTG_RET(code);
}

D
dapan1121 已提交
108 109
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
110

D
dapan1121 已提交
111 112 113 114 115 116
  switch (reqType) {
    case TDMT_MND_QNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
117

D
dapan1121 已提交
118 119 120 121 122
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
123

D
dapan1121 已提交
124 125 126
      qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
      break;
    }
D
dapan1121 已提交
127 128 129 130 131
    case TDMT_MND_DNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
132

D
dapan1121 已提交
133 134 135 136 137
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
138

D
dapan1121 已提交
139 140 141
      qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
      break;
    }
D
dapan1121 已提交
142 143 144 145 146
    case TDMT_MND_USE_DB: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
147

D
dapan1121 已提交
148 149 150 151 152
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
153

D
dapan1121 已提交
154 155 156 157 158 159 160 161
      qDebug("Got db vgInfo from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_DB_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
162

D
dapan1121 已提交
163 164 165 166 167
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
168

D
dapan1121 已提交
169 170 171 172 173 174 175 176
      qDebug("Got db cfg from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
177

D
dapan1121 已提交
178 179 180 181 182
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
183

D
dapan1121 已提交
184 185 186
      qDebug("Got index from mnode, indexName:%s", target);
      break;
    }
D
dapan1121 已提交
187 188 189 190 191
    case TDMT_MND_GET_TABLE_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
192

D
dapan1121 已提交
193 194 195 196 197
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
198

D
dapan1121 已提交
199 200 201
      qDebug("Got table index from mnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
202 203 204 205 206
    case TDMT_MND_RETRIEVE_FUNC: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
207

D
dapan1121 已提交
208 209 210 211 212
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
213

D
dapan1121 已提交
214 215 216 217 218 219 220 221
      qDebug("Got udf from mnode, funcName:%s", target);
      break;
    }
    case TDMT_MND_GET_USER_AUTH: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
222

D
dapan1121 已提交
223 224 225 226 227
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
228

D
dapan1121 已提交
229 230 231 232 233 234 235 236 237 238
      qDebug("Got user auth from mnode, user:%s", target);
      break;
    }
    case TDMT_MND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("stablemeta not exist in mnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
239

D
dapan1121 已提交
240 241 242
        qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
243

D
dapan1121 已提交
244 245 246 247 248
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
249

D
dapan1121 已提交
250 251 252 253 254 255 256 257 258 259
      qDebug("Got table meta from mnode, tbFName:%s", target);
      break;
    }
    case TDMT_VND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("tablemeta not exist in vnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
260

D
dapan1121 已提交
261 262 263
        qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
264

D
dapan1121 已提交
265 266 267 268 269
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
270

D
dapan1121 已提交
271 272 273
      qDebug("Got table meta from vnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
274 275 276 277 278
    case TDMT_VND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
279

D
dapan1121 已提交
280 281 282 283 284
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
285

D
dapan1121 已提交
286 287 288 289 290 291 292 293
      qDebug("Got table cfg from vnode, tbFName:%s", target);
      break;
    }
    case TDMT_MND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
294

D
dapan1121 已提交
295 296 297 298 299
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
300

D
dapan1121 已提交
301 302
      qDebug("Got stb cfg from mnode, tbFName:%s", target);
      break;
dengyihao's avatar
dengyihao 已提交
303
    }
D
dapan1121 已提交
304 305 306 307 308
    case TDMT_MND_SERVER_VERSION: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
309

D
dapan1121 已提交
310 311 312 313 314
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process svr ver rsp failed, error:%s", tstrerror(code));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
315

D
dapan1121 已提交
316 317 318
      qDebug("Got svr ver from mnode");
      break;
    }
D
dapan1121 已提交
319
    default:
D
dapan1121 已提交
320 321 322 323
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("Got error rsp, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
324

D
dapan1121 已提交
325 326
      qError("invalid req type %s", TMSG_INFO(reqType));
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
327 328 329 330 331
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
332
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
D
dapan1121 已提交
333
  SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
dengyihao's avatar
dengyihao 已提交
334 335 336
  int32_t                code = 0;
  SCtgJob*               pJob = NULL;

D
dapan1121 已提交
337
  CTG_API_JENTER();
D
dapan1121 已提交
338

D
dapan1121 已提交
339
  pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
D
dapan1121 已提交
340
  if (NULL == pJob) {
D
dapan1121 已提交
341
    qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
D
dapan1121 已提交
342 343 344
    goto _return;
  }

345 346
  SCatalog* pCtg = pJob->pCtg;

D
dapan1121 已提交
347 348 349
  if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
    CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
  } else {
dengyihao's avatar
dengyihao 已提交
350 351
    int32_t*  taskId = taosArrayGet(cbParam->taskId, 0);
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
352

dengyihao's avatar
dengyihao 已提交
353 354
    qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
           TMSG_INFO(cbParam->reqType + 1));
355 356

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
357 358
    SHashObj* pBatchs =
        taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
359 360
    if (NULL == pBatchs) {
      ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
361
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
362
    }
D
dapan1121 已提交
363

dengyihao's avatar
dengyihao 已提交
364
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
365
    pMsgCtx->pBatchs = pBatchs;
366 367
#endif

D
dapan1121 已提交
368 369 370
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;
dengyihao's avatar
dengyihao 已提交
371

D
dapan1121 已提交
372
    CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
373 374

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
375 376
    CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
#endif
D
dapan1121 已提交
377
  }
dengyihao's avatar
dengyihao 已提交
378

D
dapan1121 已提交
379 380
_return:

D
dapan1121 已提交
381
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
382
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
383

D
dapan1121 已提交
384 385 386 387 388 389 390
  if (pJob) {
    taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
  }

  CTG_API_LEAVE(code);
}

D
dapan1121 已提交
391
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, SArray* pMsgIdx, int32_t msgType,
dengyihao's avatar
dengyihao 已提交
392
                           SMsgSendInfo** pMsgSendInfo) {
D
dapan1121 已提交
393
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
394
  SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
395 396
  if (NULL == msgSendInfo) {
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
S
Shengliang Guan 已提交
397
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
398 399
  }

dengyihao's avatar
dengyihao 已提交
400
  SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
D
dapan1121 已提交
401 402
  if (NULL == param) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
S
Shengliang Guan 已提交
403
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
404 405 406
  }

  param->reqType = msgType;
D
dapan1121 已提交
407 408 409 410
  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = pTaskId;
  param->batchId = batchId;
D
dapan1121 已提交
411
  param->msgIdx = pMsgIdx;
D
dapan1121 已提交
412 413

  msgSendInfo->param = param;
414
  msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
D
dapan1121 已提交
415 416 417 418 419 420 421 422
  msgSendInfo->fp = ctgHandleMsgCallback;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

423 424
  taosArrayDestroy(pTaskId);
  destroySendMsgInfo(msgSendInfo);
D
dapan1121 已提交
425 426 427 428

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
429
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
D
dapan1121 已提交
430
                        SArray* pMsgIdx, char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
dengyihao's avatar
dengyihao 已提交
431 432
  int32_t       code = 0;
  SMsgSendInfo* pMsgSendInfo = NULL;
D
dapan1121 已提交
433
  CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, pMsgIdx, msgType, &pMsgSendInfo));
D
dapan1121 已提交
434

D
dapan1121 已提交
435
  ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
D
dapan1121 已提交
436 437 438

  pMsgSendInfo->requestId = pConn->requestId;
  pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
D
dapan1121 已提交
439 440 441 442 443 444
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = NULL;
  pMsgSendInfo->msgType = msgType;

  int64_t transporterId = 0;
D
dapan1121 已提交
445
  code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
446
  pMsgSendInfo = NULL;
D
dapan1121 已提交
447 448 449 450 451
  if (code) {
    ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
    CTG_ERR_JRET(code);
  }

D
dapan1121 已提交
452
  ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
453 454 455 456 457
  return TSDB_CODE_SUCCESS;

_return:

  if (pMsgSendInfo) {
D
dapan1121 已提交
458 459 460 461 462 463
    destroySendMsgInfo(pMsgSendInfo);
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
464 465 466 467
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType,
                    void* msg, uint32_t msgSize) {
  int32_t     code = 0;
  SCtgTask*   pTask = tReq->pTask;
D
dapan1121 已提交
468
  SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
dengyihao's avatar
dengyihao 已提交
469 470 471 472 473 474
  SHashObj*   pBatchs = pMsgCtx->pBatchs;
  SCtgJob*    pJob = pTask->pJob;
  SCtgBatch*  pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
  SCtgBatch   newBatch = {0};
  SBatchMsg   req = {0};

D
dapan1121 已提交
475
  if (NULL == pBatch) {
D
dapan1121 已提交
476 477
    newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
    newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
D
dapan1121 已提交
478 479
    newBatch.pMsgIdxs = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
    if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds || NULL == newBatch.pMsgIdxs) {
D
dapan1121 已提交
480 481
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
482

483
    newBatch.conn = *pConn;
D
dapan1121 已提交
484

D
dapan1121 已提交
485
    req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
486 487 488 489 490 491
    req.msgType = msgType;
    req.msgLen = msgSize;
    req.msg = msg;
    if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
492
    msg = NULL;
D
dapan1121 已提交
493 494 495
    if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
496 497 498
    if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
499 500

    if (vgId > 0) {
D
dapan1121 已提交
501
      SName* pName = NULL;
D
dapan1121 已提交
502 503
      if (TDMT_VND_TABLE_CFG == msgType) {
        SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
504
        pName = ctx->pName;
D
dapan1121 已提交
505
      } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
506
        if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
507
          SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
508
          SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
509
          pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
510 511 512 513
        } else {
          SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
          pName = ctx->pName;
        }
D
dapan1121 已提交
514 515 516 517
      } else {
        ctgError("invalid vnode msgType %d", msgType);
        CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
      }
D
dapan1121 已提交
518 519

      tNameGetFullDbName(pName, newBatch.dbFName);
D
dapan1121 已提交
520 521 522 523 524 525 526 527 528
    }

    newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
    newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);

    if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

dengyihao's avatar
dengyihao 已提交
529 530
    ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
             vgId);
D
dapan1121 已提交
531 532 533 534

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
535
  req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
536 537 538 539 540 541
  req.msgType = msgType;
  req.msgLen = msgSize;
  req.msg = msg;
  if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
542
  msg = NULL;
D
dapan1121 已提交
543 544 545
  if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
546 547 548
  if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
549

D
dapan1121 已提交
550
  if (vgId > 0) {
D
dapan1121 已提交
551
    SName* pName = NULL;
D
dapan1121 已提交
552 553
    if (TDMT_VND_TABLE_CFG == msgType) {
      SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
554
      pName = ctx->pName;
D
dapan1121 已提交
555
    } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
556
      if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
557
        SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
558
        SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
559
        pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
560 561 562 563
      } else {
        SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
        pName = ctx->pName;
      }
D
dapan1121 已提交
564 565 566 567
    } else {
      ctgError("invalid vnode msgType %d", msgType);
      CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
    }
D
dapan1121 已提交
568

dengyihao's avatar
dengyihao 已提交
569
    tNameGetFullDbName(pName, pBatch->dbFName);
D
dapan1121 已提交
570 571
  }

dengyihao's avatar
dengyihao 已提交
572 573
  ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
           vgId);
D
dapan1121 已提交
574 575 576 577 578 579 580

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeBatch(&newBatch);
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
581

D
dapan1121 已提交
582 583 584
  return code;
}

D
dapan1121 已提交
585
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) {
dengyihao's avatar
dengyihao 已提交
586
  int32_t    num = taosArrayGetSize(pBatch->pMsgs);
D
dapan1121 已提交
587 588 589 590 591
  if (num >= CTG_MAX_REQ_IN_BATCH) {
    qError("too many msgs %d in one batch request", num);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610
  SBatchReq batchReq = {0};

  batchReq.header.vgId = vgId;
  batchReq.pMsgs = pBatch->pMsgs;

  int32_t msgSize = tSerializeSBatchReq(NULL, 0, &batchReq);
  if (msgSize < 0) {
    qError("tSerializeSBatchReq failed");
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
  
  *msg = taosMemoryCalloc(1, msgSize);
  if (NULL == (*msg)) {
    qError("calloc batchReq msg failed, size:%d", msgSize);
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
  if (tSerializeSBatchReq(*msg, msgSize, &batchReq) < 0) {
    qError("tSerializeSBatchReq failed");
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
611 612
  }

D
dapan1121 已提交
613
  *pSize = msgSize;
D
dapan1121 已提交
614

615 616
  qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);

D
dapan1121 已提交
617 618 619
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
620
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
D
dapan1121 已提交
621
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
622 623
  void*   msg = NULL;
  void*   p = taosHashIterate(pBatchs, NULL);
D
dapan1121 已提交
624
  while (NULL != p) {
dengyihao's avatar
dengyihao 已提交
625 626
    size_t     len = 0;
    int32_t*   vgId = taosHashGetKey(p, &len);
D
dapan1121 已提交
627
    SCtgBatch* pBatch = (SCtgBatch*)p;
D
dapan1121 已提交
628
    int32_t msgSize = 0;
629 630

    ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
dengyihao's avatar
dengyihao 已提交
631

D
dapan1121 已提交
632
    CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize));
dengyihao's avatar
dengyihao 已提交
633
    code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
D
dapan1121 已提交
634
                           pBatch->dbFName, *vgId, pBatch->msgType, msg, msgSize);
635 636
    pBatch->pTaskIds = NULL;
    CTG_ERR_JRET(code);
dengyihao's avatar
dengyihao 已提交
637

D
dapan1121 已提交
638 639 640 641 642 643 644 645 646 647 648
    p = taosHashIterate(pBatchs, p);
  }

  return TSDB_CODE_SUCCESS;

_return:

  if (p) {
    taosHashCancelIterate(pBatchs, p);
  }
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
649

D
dapan1121 已提交
650 651 652
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
653 654
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
655 656
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_QNODE_LIST;
wafwerar's avatar
wafwerar 已提交
657
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
658

D
dapan1121 已提交
659
  ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
D
dapan1121 已提交
660

D
dapan1121 已提交
661
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
662 663 664 665 666
  if (code) {
    ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
667 668 669 670 671
  if (pTask) {
    void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
672

D
dapan1121 已提交
673
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, NULL));
D
dapan1121 已提交
674

675
#if CTG_BATCH_FETCH
D
dapan1121 已提交
676 677
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
678
    tReq.msgIdx = -1;
D
dapan1121 已提交
679
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
680
#else
D
dapan1121 已提交
681 682 683 684 685
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
686

D
dapan1121 已提交
687
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
688
#endif
D
dapan1121 已提交
689
  }
dengyihao's avatar
dengyihao 已提交
690

D
dapan1121 已提交
691 692
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
693
      .pCont = msg,
D
dapan1121 已提交
694 695 696 697 698 699 700 701
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
702 703
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
704 705 706
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
707 708
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
709 710
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_DNODE_LIST;
wafwerar's avatar
wafwerar 已提交
711
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
712 713 714 715 716 717 718 719 720

  ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
721
  if (pTask) {
D
dapan1121 已提交
722
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
D
dapan1121 已提交
723

724
#if CTG_BATCH_FETCH
D
dapan1121 已提交
725 726
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
727
    tReq.msgIdx = -1;
D
dapan1121 已提交
728
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
729
#else
D
dapan1121 已提交
730 731 732 733 734
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
735

D
dapan1121 已提交
736
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
737
#endif
D
dapan1121 已提交
738
  }
dengyihao's avatar
dengyihao 已提交
739

D
dapan1121 已提交
740 741
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
742
      .pCont = msg,
D
dapan1121 已提交
743 744 745 746
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
747
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
748 749 750

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
751 752
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
753 754 755
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
756
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
D
dapan1121 已提交
757
                                SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
758 759 760
  char*     msg = NULL;
  int32_t   msgLen = 0;
  int32_t   reqType = TDMT_MND_USE_DB;
D
dapan1121 已提交
761
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
wafwerar's avatar
wafwerar 已提交
762
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
763 764 765

  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
766
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
767 768 769 770 771 772 773 774 775 776
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
777

D
dapan1121 已提交
778
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, input->db));
D
dapan1121 已提交
779

780
#if CTG_BATCH_FETCH
D
dapan1121 已提交
781
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
782
#else
D
dapan1121 已提交
783 784 785 786 787
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
788

D
dapan1121 已提交
789
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
790
#endif
D
dapan1121 已提交
791
  }
dengyihao's avatar
dengyihao 已提交
792

D
dapan1121 已提交
793 794
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
795
      .pCont = msg,
D
dapan1121 已提交
796 797 798 799
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
800
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
801 802

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
dengyihao's avatar
dengyihao 已提交
803

D
dapan1121 已提交
804
  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
805

D
dapan1121 已提交
806 807 808
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
809 810 811
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
                             SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
812 813
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_DB_CFG;
wafwerar's avatar
wafwerar 已提交
814
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
815 816 817

  ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);

dengyihao's avatar
dengyihao 已提交
818
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
819 820 821 822 823 824 825 826 827 828
  if (code) {
    ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
829

D
dapan1121 已提交
830
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)dbFName));
D
dapan1121 已提交
831

832
#if CTG_BATCH_FETCH
D
dapan1121 已提交
833 834
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
835
    tReq.msgIdx = -1;
D
dapan1121 已提交
836
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
837
#else
D
dapan1121 已提交
838 839 840 841 842
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
843

D
dapan1121 已提交
844
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
845
#endif
D
dapan1121 已提交
846
  }
dengyihao's avatar
dengyihao 已提交
847

D
dapan1121 已提交
848 849
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_GET_DB_CFG,
dengyihao's avatar
dengyihao 已提交
850
      .pCont = msg,
D
dapan1121 已提交
851 852 853 854
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
855
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
856 857 858

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));

D
dapan1121 已提交
859 860
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
861 862 863
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
864 865 866
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
                                 SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
867 868
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_INDEX;
wafwerar's avatar
wafwerar 已提交
869
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
870 871 872

  ctgDebug("try to get index from mnode, indexName:%s", indexName);

dengyihao's avatar
dengyihao 已提交
873
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
874 875 876 877 878 879 880 881 882 883
  if (code) {
    ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SIndexInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
884

D
dapan1121 已提交
885
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)indexName));
886 887

#if CTG_BATCH_FETCH
D
dapan1121 已提交
888 889
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
890
    tReq.msgIdx = -1;
D
dapan1121 已提交
891
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
892
#else
D
dapan1121 已提交
893 894 895 896 897
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
898

D
dapan1121 已提交
899
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
900
#endif
D
dapan1121 已提交
901
  }
dengyihao's avatar
dengyihao 已提交
902

D
dapan1121 已提交
903 904
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
905
      .pCont = msg,
D
dapan1121 已提交
906 907 908 909
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
910
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
911 912

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
D
dapan1121 已提交
913 914

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
915

D
dapan1121 已提交
916 917 918
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
919 920 921
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
922 923
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
wafwerar's avatar
wafwerar 已提交
924
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
925 926
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(name, tbFName);
D
dapan1121 已提交
927 928 929

  ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);

dengyihao's avatar
dengyihao 已提交
930
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
931 932 933 934 935 936
  if (code) {
    ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
937 938 939 940
    void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
941

D
dapan1121 已提交
942
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
943 944

#if CTG_BATCH_FETCH
D
dapan1121 已提交
945 946
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
947
    tReq.msgIdx = -1;
D
dapan1121 已提交
948
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
949
#else
D
dapan1121 已提交
950 951 952 953 954
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
955

D
dapan1121 已提交
956
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
957
#endif
D
dapan1121 已提交
958
  }
dengyihao's avatar
dengyihao 已提交
959

D
dapan1121 已提交
960 961
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
962
      .pCont = msg,
D
dapan1121 已提交
963 964 965 966
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
967
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
968 969

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
970 971

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
972

D
dapan1121 已提交
973 974 975
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
976 977 978
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
979 980
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
wafwerar's avatar
wafwerar 已提交
981
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
982 983 984

  ctgDebug("try to get udf info from mnode, funcName:%s", funcName);

dengyihao's avatar
dengyihao 已提交
985
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
986 987 988 989 990 991 992 993 994 995
  if (code) {
    ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SFuncInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
996

D
dapan1121 已提交
997
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)funcName));
998 999

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1000 1001
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1002
    tReq.msgIdx = -1;
D
dapan1121 已提交
1003
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
1004
#else
D
dapan1121 已提交
1005 1006 1007 1008 1009
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1010

D
dapan1121 已提交
1011
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1012
#endif
D
dapan1121 已提交
1013
  }
dengyihao's avatar
dengyihao 已提交
1014

D
dapan1121 已提交
1015 1016
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1017
      .pCont = msg,
D
dapan1121 已提交
1018 1019 1020 1021
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1022
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1023 1024 1025

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));

D
dapan1121 已提交
1026 1027
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1028 1029 1030
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1031 1032 1033
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
                                  SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1034 1035
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_USER_AUTH;
wafwerar's avatar
wafwerar 已提交
1036
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1037 1038 1039

  ctgDebug("try to get user auth from mnode, user:%s", user);

dengyihao's avatar
dengyihao 已提交
1040
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1041 1042 1043 1044 1045 1046 1047 1048 1049 1050
  if (code) {
    ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1051

D
dapan1121 已提交
1052
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)user));
1053 1054

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1055 1056
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1057
    tReq.msgIdx = -1;
D
dapan1121 已提交
1058
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1059
#else
D
dapan1121 已提交
1060 1061 1062 1063 1064
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1065

D
dapan1121 已提交
1066
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1067
#endif
D
dapan1121 已提交
1068
  }
dengyihao's avatar
dengyihao 已提交
1069

D
dapan1121 已提交
1070 1071
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1072
      .pCont = msg,
D
dapan1121 已提交
1073 1074 1075 1076
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1077
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1078 1079

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
D
dapan1121 已提交
1080 1081

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1082

D
dapan1121 已提交
1083 1084 1085
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1086
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
D
dapan1121 已提交
1087
                                  STableMetaOutput* out, SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
1088
  SCtgTask*        pTask = tReq ? tReq->pTask : NULL;
D
dapan1121 已提交
1089
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
dengyihao's avatar
dengyihao 已提交
1090 1091 1092 1093 1094
  char*            msg = NULL;
  SEpSet*          pVnodeEpSet = NULL;
  int32_t          msgLen = 0;
  int32_t          reqType = TDMT_MND_TABLE_META;
  char             tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1095
  sprintf(tbFName, "%s.%s", dbFName, tbName);
wafwerar's avatar
wafwerar 已提交
1096
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1097 1098 1099

  ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);

D
dapan1121 已提交
1100
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1101 1102 1103 1104 1105 1106 1107 1108 1109 1110
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1111

D
dapan1121 已提交
1112
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
dengyihao's avatar
dengyihao 已提交
1113

1114
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1115
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1116
#else
D
dapan1121 已提交
1117 1118 1119 1120 1121
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1122

D
dapan1121 已提交
1123
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1124
#endif
D
dapan1121 已提交
1125 1126 1127 1128
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1129
      .pCont = msg,
D
dapan1121 已提交
1130 1131 1132 1133
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1134
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
dengyihao's avatar
dengyihao 已提交
1135

D
dapan1121 已提交
1136 1137
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));

D
dapan1121 已提交
1138 1139
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1140 1141 1142
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1143
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
D
dapan1121 已提交
1144
                              SCtgTaskReq* tReq) {
D
dapan1121 已提交
1145 1146 1147
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
1148
  return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, tReq);
D
dapan1121 已提交
1149 1150
}

dengyihao's avatar
dengyihao 已提交
1151
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
D
dapan1121 已提交
1152
                              STableMetaOutput* out, SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
1153 1154
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
  char      dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1155 1156
  tNameGetFullDbName(pTableName, dbFName);
  int32_t reqType = TDMT_VND_TABLE_META;
dengyihao's avatar
dengyihao 已提交
1157
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1158
  sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
wafwerar's avatar
wafwerar 已提交
1159
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1160

D
dapan1121 已提交
1161
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1162 1163
  ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1164

dengyihao's avatar
dengyihao 已提交
1165 1166 1167
  SBuildTableInput bInput = {
      .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
  char*   msg = NULL;
D
dapan1121 已提交
1168 1169
  int32_t msgLen = 0;

D
dapan1121 已提交
1170
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
  if (code) {
    ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1181

dengyihao's avatar
dengyihao 已提交
1182 1183 1184 1185
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1186

dengyihao's avatar
dengyihao 已提交
1187
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
D
dapan1121 已提交
1188

D
dapan1121 已提交
1189
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1190
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1191
#else
D
dapan1121 已提交
1192
    SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1193
    char           dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1194 1195 1196 1197 1198 1199 1200
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);

D
dapan1121 已提交
1201
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->vgId, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1202
#endif
D
dapan1121 已提交
1203 1204 1205 1206
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1207
      .pCont = msg,
D
dapan1121 已提交
1208 1209 1210 1211
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1212
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1213

dengyihao's avatar
dengyihao 已提交
1214
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
D
dapan1121 已提交
1215

D
dapan1121 已提交
1216 1217
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1218 1219 1220
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1221 1222 1223
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
                                SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1224 1225
  int32_t msgLen = 0;
  int32_t reqType = TDMT_VND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1226
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1227
  tNameExtractFullName(pTableName, tbFName);
wafwerar's avatar
wafwerar 已提交
1228
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1229 1230
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1231
  SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1232

D
dapan1121 已提交
1233
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1234 1235
  ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1236 1237 1238 1239 1240 1241 1242 1243

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1244
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1245

dengyihao's avatar
dengyihao 已提交
1246 1247 1248 1249
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1250
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1251 1252
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1253
    tReq.msgIdx = -1;
D
dapan1121 已提交
1254
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
D
dapan1121 已提交
1255 1256
#else
    SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1257
    char          dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1258 1259 1260 1261 1262 1263
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1264

dengyihao's avatar
dengyihao 已提交
1265 1266
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg,
                            msgLen));
dengyihao's avatar
dengyihao 已提交
1267
#endif
D
dapan1121 已提交
1268
  }
dengyihao's avatar
dengyihao 已提交
1269

D
dapan1121 已提交
1270 1271
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1272
      .pCont = msg,
D
dapan1121 已提交
1273 1274 1275 1276
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1277
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1278 1279

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1280 1281

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1282

D
dapan1121 已提交
1283 1284 1285
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1286 1287 1288
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
                                SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1289 1290
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1291
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1292
  tNameExtractFullName(pTableName, tbFName);
wafwerar's avatar
wafwerar 已提交
1293
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1294 1295
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1296
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1297 1298 1299 1300 1301 1302 1303 1304 1305 1306

  ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1307
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1308

1309
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1310 1311
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1312
    tReq.msgIdx = -1;
D
dapan1121 已提交
1313
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1314
#else
D
dapan1121 已提交
1315 1316 1317 1318 1319
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1320

D
dapan1121 已提交
1321
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1322
#endif
D
dapan1121 已提交
1323
  }
dengyihao's avatar
dengyihao 已提交
1324

D
dapan1121 已提交
1325 1326
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1327
      .pCont = msg,
D
dapan1121 已提交
1328 1329 1330 1331 1332 1333 1334
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1335 1336

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1337

D
dapan1121 已提交
1338 1339 1340
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1341 1342
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1343 1344
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_SERVER_VERSION;
wafwerar's avatar
wafwerar 已提交
1345
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1346 1347 1348 1349 1350 1351 1352 1353 1354 1355

  qDebug("try to get svr ver from mnode");

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get svr ver msg failed, code:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1356
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
1357 1358

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1359 1360
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1361
    tReq.msgIdx = -1;
D
dapan1121 已提交
1362
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1363
#else
D
dapan1121 已提交
1364 1365 1366 1367 1368
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1369

D
dapan1121 已提交
1370
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1371
#endif
D
dapan1121 已提交
1372
  }
dengyihao's avatar
dengyihao 已提交
1373

D
dapan1121 已提交
1374 1375
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1376
      .pCont = msg,
D
dapan1121 已提交
1377 1378 1379 1380 1381 1382 1383
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
D
dapan1121 已提交
1384 1385

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1386

D
dapan1121 已提交
1387 1388
  return TSDB_CODE_SUCCESS;
}