ctgRemote.c 43.6 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tname.h"
D
dapan1121 已提交
20
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
21
#include "trpc.h"
D
dapan1121 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
25
  SCatalog* pCtg = pJob->pCtg;
D
dapan1121 已提交
26
  int32_t   taskNum = taosArrayGetSize(cbParam->taskId);
dengyihao's avatar
dengyihao 已提交
27 28
  SDataBuf  taskMsg = *pMsg;
  int32_t   offset = 0;
D
dapan1121 已提交
29
  int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
D
dapan1121 已提交
30 31
  ASSERT(taskNum == msgNum || 0 == msgNum);

dengyihao's avatar
dengyihao 已提交
32 33
  ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
           TMSG_INFO(cbParam->reqType + 1));
D
dapan1121 已提交
34 35 36 37 38 39 40 41

  offset += sizeof(msgNum);
  SBatchRsp rsp = {0};
  SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pBatchs) {
    ctgError("taosHashInit %d batch failed", taskNum);
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
42

D
dapan1121 已提交
43
  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
44 45
    int32_t*  taskId = taosArrayGet(cbParam->taskId, i);
    int32_t*  msgIdx = taosArrayGet(cbParam->msgIdx, i);
dengyihao's avatar
dengyihao 已提交
46
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
47
    if (msgNum > 0) {
48
      rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
D
dapan1121 已提交
49
      offset += sizeof(rsp.reqType);
D
dapan1121 已提交
50 51
      rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
      offset += sizeof(rsp.msgIdx);
52
      rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
D
dapan1121 已提交
53
      offset += sizeof(rsp.msgLen);
54
      rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
D
dapan1121 已提交
55 56 57
      offset += sizeof(rsp.rspCode);
      rsp.msg = ((char*)pMsg->pData) + offset;
      offset += rsp.msgLen;
dengyihao's avatar
dengyihao 已提交
58

D
dapan1121 已提交
59 60 61
      taskMsg.msgType = rsp.reqType;
      taskMsg.pData = rsp.msg;
      taskMsg.len = rsp.msgLen;
D
dapan1121 已提交
62 63

      ASSERT(rsp.msgIdx == *msgIdx);
D
dapan1121 已提交
64
    } else {
D
dapan1121 已提交
65
      rsp.msgIdx = *msgIdx;
D
dapan1121 已提交
66 67 68 69 70
      rsp.reqType = -1;
      taskMsg.msgType = -1;
      taskMsg.pData = NULL;
      taskMsg.len = 0;
    }
dengyihao's avatar
dengyihao 已提交
71

D
dapan1121 已提交
72 73
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
74
    tReq.msgIdx = rsp.msgIdx;
D
dapan1121 已提交
75 76
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
    pMsgCtx->pBatchs = pBatchs;
D
dapan1121 已提交
77

dengyihao's avatar
dengyihao 已提交
78 79
    ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
             rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
D
dapan1121 已提交
80 81

    (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
D
dapan1121 已提交
82 83 84 85 86 87 88 89 90 91
  }

  CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));

_return:

  ctgFreeBatchs(pBatchs);
  CTG_RET(code);
}

D
dapan1121 已提交
92 93
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
94

D
dapan1121 已提交
95 96 97 98 99 100
  switch (reqType) {
    case TDMT_MND_QNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
101

D
dapan1121 已提交
102 103 104 105 106
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
107

D
dapan1121 已提交
108 109 110
      qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
      break;
    }
D
dapan1121 已提交
111 112 113 114 115
    case TDMT_MND_DNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
116

D
dapan1121 已提交
117 118 119 120 121
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
122

D
dapan1121 已提交
123 124 125
      qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
      break;
    }
D
dapan1121 已提交
126 127 128 129 130
    case TDMT_MND_USE_DB: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
131

D
dapan1121 已提交
132 133 134 135 136
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
137

D
dapan1121 已提交
138 139 140 141 142 143 144 145
      qDebug("Got db vgInfo from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_DB_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
146

D
dapan1121 已提交
147 148 149 150 151
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
152

D
dapan1121 已提交
153 154 155 156 157 158 159 160
      qDebug("Got db cfg from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
161

D
dapan1121 已提交
162 163 164 165 166
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
167

D
dapan1121 已提交
168 169 170
      qDebug("Got index from mnode, indexName:%s", target);
      break;
    }
D
dapan1121 已提交
171 172 173 174 175
    case TDMT_MND_GET_TABLE_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
176

D
dapan1121 已提交
177 178 179 180 181
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
182

D
dapan1121 已提交
183 184 185
      qDebug("Got table index from mnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
186 187 188 189 190
    case TDMT_MND_RETRIEVE_FUNC: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
191

D
dapan1121 已提交
192 193 194 195 196
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
197

D
dapan1121 已提交
198 199 200 201 202 203 204 205
      qDebug("Got udf from mnode, funcName:%s", target);
      break;
    }
    case TDMT_MND_GET_USER_AUTH: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
206

D
dapan1121 已提交
207 208 209 210 211
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
212

D
dapan1121 已提交
213 214 215 216 217 218 219 220 221 222
      qDebug("Got user auth from mnode, user:%s", target);
      break;
    }
    case TDMT_MND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("stablemeta not exist in mnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
223

D
dapan1121 已提交
224 225 226
        qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
227

D
dapan1121 已提交
228 229 230 231 232
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
233

D
dapan1121 已提交
234 235 236 237 238 239 240 241 242 243
      qDebug("Got table meta from mnode, tbFName:%s", target);
      break;
    }
    case TDMT_VND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("tablemeta not exist in vnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
244

D
dapan1121 已提交
245 246 247
        qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
248

D
dapan1121 已提交
249 250 251 252 253
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
254

D
dapan1121 已提交
255 256 257
      qDebug("Got table meta from vnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
258 259 260 261 262
    case TDMT_VND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
263

D
dapan1121 已提交
264 265 266 267 268
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
269

D
dapan1121 已提交
270 271 272 273 274 275 276 277
      qDebug("Got table cfg from vnode, tbFName:%s", target);
      break;
    }
    case TDMT_MND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
278

D
dapan1121 已提交
279 280 281 282 283
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
284

D
dapan1121 已提交
285 286
      qDebug("Got stb cfg from mnode, tbFName:%s", target);
      break;
dengyihao's avatar
dengyihao 已提交
287
    }
D
dapan1121 已提交
288 289 290 291 292
    case TDMT_MND_SERVER_VERSION: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
293

D
dapan1121 已提交
294 295 296 297 298
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process svr ver rsp failed, error:%s", tstrerror(code));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
299

D
dapan1121 已提交
300 301 302
      qDebug("Got svr ver from mnode");
      break;
    }
D
dapan1121 已提交
303
    default:
D
dapan1121 已提交
304 305 306 307
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("Got error rsp, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
308

D
dapan1121 已提交
309 310
      qError("invalid req type %s", TMSG_INFO(reqType));
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
311 312 313 314 315
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
316
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
D
dapan1121 已提交
317
  SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
dengyihao's avatar
dengyihao 已提交
318 319 320
  int32_t                code = 0;
  SCtgJob*               pJob = NULL;

D
dapan1121 已提交
321
  CTG_API_JENTER();
D
dapan1121 已提交
322

D
dapan1121 已提交
323
  pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
D
dapan1121 已提交
324
  if (NULL == pJob) {
D
dapan1121 已提交
325
    qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
D
dapan1121 已提交
326 327 328
    goto _return;
  }

329 330
  SCatalog* pCtg = pJob->pCtg;

D
dapan1121 已提交
331 332 333
  if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
    CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
  } else {
dengyihao's avatar
dengyihao 已提交
334 335
    int32_t*  taskId = taosArrayGet(cbParam->taskId, 0);
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
336

dengyihao's avatar
dengyihao 已提交
337 338
    qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
           TMSG_INFO(cbParam->reqType + 1));
339 340

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
341 342
    SHashObj* pBatchs =
        taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
343 344
    if (NULL == pBatchs) {
      ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
345
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
346
    }
D
dapan1121 已提交
347

dengyihao's avatar
dengyihao 已提交
348
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
349
    pMsgCtx->pBatchs = pBatchs;
350 351
#endif

D
dapan1121 已提交
352 353 354
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;
dengyihao's avatar
dengyihao 已提交
355

D
dapan1121 已提交
356
    CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
357 358

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
359 360
    CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
#endif
D
dapan1121 已提交
361
  }
dengyihao's avatar
dengyihao 已提交
362

D
dapan1121 已提交
363 364
_return:

D
dapan1121 已提交
365
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
366
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
367

D
dapan1121 已提交
368 369 370 371 372 373 374
  if (pJob) {
    taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
  }

  CTG_API_LEAVE(code);
}

D
dapan1121 已提交
375
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, SArray* pMsgIdx, int32_t msgType,
dengyihao's avatar
dengyihao 已提交
376
                           SMsgSendInfo** pMsgSendInfo) {
D
dapan1121 已提交
377
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
378
  SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
379 380
  if (NULL == msgSendInfo) {
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
381
    CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
382 383
  }

dengyihao's avatar
dengyihao 已提交
384
  SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
D
dapan1121 已提交
385 386 387 388 389 390
  if (NULL == param) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
    CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->reqType = msgType;
D
dapan1121 已提交
391 392 393 394
  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = pTaskId;
  param->batchId = batchId;
D
dapan1121 已提交
395
  param->msgIdx = pMsgIdx;
D
dapan1121 已提交
396 397

  msgSendInfo->param = param;
398
  msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
D
dapan1121 已提交
399 400 401 402 403 404 405 406
  msgSendInfo->fp = ctgHandleMsgCallback;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

407 408
  taosArrayDestroy(pTaskId);
  destroySendMsgInfo(msgSendInfo);
D
dapan1121 已提交
409 410 411 412

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
413
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
D
dapan1121 已提交
414
                        SArray* pMsgIdx, char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
dengyihao's avatar
dengyihao 已提交
415 416
  int32_t       code = 0;
  SMsgSendInfo* pMsgSendInfo = NULL;
D
dapan1121 已提交
417
  CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, pMsgIdx, msgType, &pMsgSendInfo));
D
dapan1121 已提交
418

D
dapan1121 已提交
419
  ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
D
dapan1121 已提交
420 421 422

  pMsgSendInfo->requestId = pConn->requestId;
  pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
D
dapan1121 已提交
423 424 425 426 427 428
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = NULL;
  pMsgSendInfo->msgType = msgType;

  int64_t transporterId = 0;
D
dapan1121 已提交
429
  code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
430
  pMsgSendInfo = NULL;
D
dapan1121 已提交
431 432 433 434 435
  if (code) {
    ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
    CTG_ERR_JRET(code);
  }

D
dapan1121 已提交
436
  ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
437 438 439 440 441
  return TSDB_CODE_SUCCESS;

_return:

  if (pMsgSendInfo) {
D
dapan1121 已提交
442 443 444 445 446 447
    destroySendMsgInfo(pMsgSendInfo);
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
448 449 450 451
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType,
                    void* msg, uint32_t msgSize) {
  int32_t     code = 0;
  SCtgTask*   pTask = tReq->pTask;
D
dapan1121 已提交
452
  SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
dengyihao's avatar
dengyihao 已提交
453 454 455 456 457 458
  SHashObj*   pBatchs = pMsgCtx->pBatchs;
  SCtgJob*    pJob = pTask->pJob;
  SCtgBatch*  pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
  SCtgBatch   newBatch = {0};
  SBatchMsg   req = {0};

D
dapan1121 已提交
459
  if (NULL == pBatch) {
D
dapan1121 已提交
460 461
    newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
    newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
D
dapan1121 已提交
462 463
    newBatch.pMsgIdxs = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
    if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds || NULL == newBatch.pMsgIdxs) {
D
dapan1121 已提交
464 465
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
466

467
    newBatch.conn = *pConn;
D
dapan1121 已提交
468

D
dapan1121 已提交
469
    req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
470 471 472 473 474 475
    req.msgType = msgType;
    req.msgLen = msgSize;
    req.msg = msg;
    if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
476
    msg = NULL;
D
dapan1121 已提交
477 478 479
    if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
480 481 482
    if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
483
    newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
D
dapan1121 已提交
484 485

    if (vgId > 0) {
D
dapan1121 已提交
486
      SName* pName = NULL;
D
dapan1121 已提交
487 488
      if (TDMT_VND_TABLE_CFG == msgType) {
        SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
489
        pName = ctx->pName;
D
dapan1121 已提交
490
      } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
491
        if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
492
          SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
493
          SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
494
          pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
495 496 497 498
        } else {
          SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
          pName = ctx->pName;
        }
D
dapan1121 已提交
499 500 501 502
      } else {
        ctgError("invalid vnode msgType %d", msgType);
        CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
      }
D
dapan1121 已提交
503 504

      tNameGetFullDbName(pName, newBatch.dbFName);
D
dapan1121 已提交
505 506 507 508 509 510 511 512 513
    }

    newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
    newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);

    if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

dengyihao's avatar
dengyihao 已提交
514 515
    ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
             vgId);
D
dapan1121 已提交
516 517 518 519

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
520
  req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
521 522 523 524 525 526
  req.msgType = msgType;
  req.msgLen = msgSize;
  req.msg = msg;
  if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
527
  msg = NULL;
D
dapan1121 已提交
528 529 530
  if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
531 532 533
  if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
534

D
dapan1121 已提交
535 536 537
  pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;

  if (vgId > 0) {
D
dapan1121 已提交
538
    SName* pName = NULL;
D
dapan1121 已提交
539 540
    if (TDMT_VND_TABLE_CFG == msgType) {
      SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
541
      pName = ctx->pName;
D
dapan1121 已提交
542
    } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
543
      if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
544
        SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
545
        SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
546
        pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
547 548 549 550
      } else {
        SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
        pName = ctx->pName;
      }
D
dapan1121 已提交
551 552 553 554
    } else {
      ctgError("invalid vnode msgType %d", msgType);
      CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
    }
D
dapan1121 已提交
555

dengyihao's avatar
dengyihao 已提交
556
    tNameGetFullDbName(pName, pBatch->dbFName);
D
dapan1121 已提交
557 558
  }

dengyihao's avatar
dengyihao 已提交
559 560
  ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
           vgId);
D
dapan1121 已提交
561 562 563 564 565 566 567

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeBatch(&newBatch);
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
568

D
dapan1121 已提交
569 570 571
  return code;
}

572
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
D
dapan1121 已提交
573
  *msg = taosMemoryCalloc(1, pBatch->msgSize);
D
dapan1121 已提交
574 575 576 577
  if (NULL == (*msg)) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

dengyihao's avatar
dengyihao 已提交
578 579 580
  int32_t    offset = 0;
  int32_t    num = taosArrayGetSize(pBatch->pMsgs);
  SBatchReq* pBatchReq = (SBatchReq*)(*msg);
581 582 583 584

  pBatchReq->header.vgId = htonl(vgId);
  pBatchReq->msgNum = htonl(num);
  offset += sizeof(SBatchReq);
dengyihao's avatar
dengyihao 已提交
585

D
dapan1121 已提交
586 587
  for (int32_t i = 0; i < num; ++i) {
    SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
D
dapan1121 已提交
588
    *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx);
dengyihao's avatar
dengyihao 已提交
589
    offset += sizeof(pReq->msgIdx);
D
dapan1121 已提交
590 591 592 593 594 595
    *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
    offset += sizeof(pReq->msgType);
    *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
    offset += sizeof(pReq->msgLen);
    memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen);
    offset += pReq->msgLen;
D
dapan1121 已提交
596 597
  }

D
dapan1121 已提交
598 599
  ASSERT(pBatch->msgSize == offset);

600 601
  qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);

D
dapan1121 已提交
602 603 604
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
605
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
D
dapan1121 已提交
606
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
607 608
  void*   msg = NULL;
  void*   p = taosHashIterate(pBatchs, NULL);
D
dapan1121 已提交
609
  while (NULL != p) {
dengyihao's avatar
dengyihao 已提交
610 611
    size_t     len = 0;
    int32_t*   vgId = taosHashGetKey(p, &len);
D
dapan1121 已提交
612
    SCtgBatch* pBatch = (SCtgBatch*)p;
613 614

    ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
dengyihao's avatar
dengyihao 已提交
615

616
    CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
dengyihao's avatar
dengyihao 已提交
617
    code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
D
dapan1121 已提交
618
                           pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
619 620
    pBatch->pTaskIds = NULL;
    CTG_ERR_JRET(code);
dengyihao's avatar
dengyihao 已提交
621

D
dapan1121 已提交
622 623 624 625 626 627 628 629 630 631 632
    p = taosHashIterate(pBatchs, p);
  }

  return TSDB_CODE_SUCCESS;

_return:

  if (p) {
    taosHashCancelIterate(pBatchs, p);
  }
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
633

D
dapan1121 已提交
634 635 636
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
637 638
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
639 640
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_QNODE_LIST;
dengyihao's avatar
dengyihao 已提交
641
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
642

D
dapan1121 已提交
643
  ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
D
dapan1121 已提交
644

D
dapan1121 已提交
645
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
646 647 648 649 650
  if (code) {
    ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
651 652 653 654 655
  if (pTask) {
    void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
656

D
dapan1121 已提交
657
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, NULL));
D
dapan1121 已提交
658

659
#if CTG_BATCH_FETCH
D
dapan1121 已提交
660 661
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
662
    tReq.msgIdx = -1;
D
dapan1121 已提交
663
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
664
#else
D
dapan1121 已提交
665 666 667 668 669
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
670

D
dapan1121 已提交
671
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
672
#endif
D
dapan1121 已提交
673
  }
dengyihao's avatar
dengyihao 已提交
674

D
dapan1121 已提交
675 676
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
677
      .pCont = msg,
D
dapan1121 已提交
678 679 680 681 682 683 684 685
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
686 687
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
688 689 690
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
691 692
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
693 694
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_DNODE_LIST;
dengyihao's avatar
dengyihao 已提交
695
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
696 697 698 699 700 701 702 703 704

  ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
705
  if (pTask) {
D
dapan1121 已提交
706
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
D
dapan1121 已提交
707

708
#if CTG_BATCH_FETCH
D
dapan1121 已提交
709 710
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
711
    tReq.msgIdx = -1;
D
dapan1121 已提交
712
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
713
#else
D
dapan1121 已提交
714 715 716 717 718
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
719

D
dapan1121 已提交
720
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
721
#endif
D
dapan1121 已提交
722
  }
dengyihao's avatar
dengyihao 已提交
723

D
dapan1121 已提交
724 725
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
726
      .pCont = msg,
D
dapan1121 已提交
727 728 729 730
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
731
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
732 733 734

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
735 736
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
737 738 739
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
740
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
D
dapan1121 已提交
741
                                SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
742 743 744
  char*     msg = NULL;
  int32_t   msgLen = 0;
  int32_t   reqType = TDMT_MND_USE_DB;
D
dapan1121 已提交
745
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
dengyihao's avatar
dengyihao 已提交
746
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
747 748 749

  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
750
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
751 752 753 754 755 756 757 758 759 760
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
761

D
dapan1121 已提交
762
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, input->db));
D
dapan1121 已提交
763

764
#if CTG_BATCH_FETCH
D
dapan1121 已提交
765
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
766
#else
D
dapan1121 已提交
767 768 769 770 771
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
772

D
dapan1121 已提交
773
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
774
#endif
D
dapan1121 已提交
775
  }
dengyihao's avatar
dengyihao 已提交
776

D
dapan1121 已提交
777 778
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
779
      .pCont = msg,
D
dapan1121 已提交
780 781 782 783
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
784
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
785 786

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
dengyihao's avatar
dengyihao 已提交
787

D
dapan1121 已提交
788
  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
789

D
dapan1121 已提交
790 791 792
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
793 794 795
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
                             SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
796 797
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_DB_CFG;
dengyihao's avatar
dengyihao 已提交
798
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
799 800 801

  ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);

dengyihao's avatar
dengyihao 已提交
802
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
803 804 805 806 807 808 809 810 811 812
  if (code) {
    ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
813

D
dapan1121 已提交
814
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)dbFName));
D
dapan1121 已提交
815

816
#if CTG_BATCH_FETCH
D
dapan1121 已提交
817 818
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
819
    tReq.msgIdx = -1;
D
dapan1121 已提交
820
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
821
#else
D
dapan1121 已提交
822 823 824 825 826
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
827

D
dapan1121 已提交
828
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
829
#endif
D
dapan1121 已提交
830
  }
dengyihao's avatar
dengyihao 已提交
831

D
dapan1121 已提交
832 833
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_GET_DB_CFG,
dengyihao's avatar
dengyihao 已提交
834
      .pCont = msg,
D
dapan1121 已提交
835 836 837 838
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
839
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
840 841 842

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));

D
dapan1121 已提交
843 844
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
845 846 847
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
848 849 850
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
                                 SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
851 852
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_INDEX;
dengyihao's avatar
dengyihao 已提交
853
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
854 855 856

  ctgDebug("try to get index from mnode, indexName:%s", indexName);

dengyihao's avatar
dengyihao 已提交
857
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
858 859 860 861 862 863 864 865 866 867
  if (code) {
    ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SIndexInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
868

D
dapan1121 已提交
869
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)indexName));
870 871

#if CTG_BATCH_FETCH
D
dapan1121 已提交
872 873
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
874
    tReq.msgIdx = -1;
D
dapan1121 已提交
875
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
876
#else
D
dapan1121 已提交
877 878 879 880 881
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
882

D
dapan1121 已提交
883
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
884
#endif
D
dapan1121 已提交
885
  }
dengyihao's avatar
dengyihao 已提交
886

D
dapan1121 已提交
887 888
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
889
      .pCont = msg,
D
dapan1121 已提交
890 891 892 893
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
894
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
895 896

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
D
dapan1121 已提交
897 898

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
899

D
dapan1121 已提交
900 901 902
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
903 904 905
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
906 907
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
dengyihao's avatar
dengyihao 已提交
908
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
909 910
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(name, tbFName);
D
dapan1121 已提交
911 912 913

  ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);

dengyihao's avatar
dengyihao 已提交
914
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
915 916 917 918 919 920
  if (code) {
    ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
921 922 923 924
    void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
925

D
dapan1121 已提交
926
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
927 928

#if CTG_BATCH_FETCH
D
dapan1121 已提交
929 930
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
931
    tReq.msgIdx = -1;
D
dapan1121 已提交
932
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
933
#else
D
dapan1121 已提交
934 935 936 937 938
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
939

D
dapan1121 已提交
940
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
941
#endif
D
dapan1121 已提交
942
  }
dengyihao's avatar
dengyihao 已提交
943

D
dapan1121 已提交
944 945
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
946
      .pCont = msg,
D
dapan1121 已提交
947 948 949 950
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
951
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
952 953

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
954 955

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
956

D
dapan1121 已提交
957 958 959
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
960 961 962
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
963 964
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
dengyihao's avatar
dengyihao 已提交
965
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
966 967 968

  ctgDebug("try to get udf info from mnode, funcName:%s", funcName);

dengyihao's avatar
dengyihao 已提交
969
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
970 971 972 973 974 975 976 977 978 979
  if (code) {
    ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SFuncInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
980

D
dapan1121 已提交
981
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)funcName));
982 983

#if CTG_BATCH_FETCH
D
dapan1121 已提交
984 985
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
986
    tReq.msgIdx = -1;
D
dapan1121 已提交
987
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
988
#else
D
dapan1121 已提交
989 990 991 992 993
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
994

D
dapan1121 已提交
995
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
996
#endif
D
dapan1121 已提交
997
  }
dengyihao's avatar
dengyihao 已提交
998

D
dapan1121 已提交
999 1000
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1001
      .pCont = msg,
D
dapan1121 已提交
1002 1003 1004 1005
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1006
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1007 1008 1009

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));

D
dapan1121 已提交
1010 1011
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1012 1013 1014
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1015 1016 1017
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
                                  SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1018 1019
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_USER_AUTH;
dengyihao's avatar
dengyihao 已提交
1020
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1021 1022 1023

  ctgDebug("try to get user auth from mnode, user:%s", user);

dengyihao's avatar
dengyihao 已提交
1024
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1025 1026 1027 1028 1029 1030 1031 1032 1033 1034
  if (code) {
    ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1035

D
dapan1121 已提交
1036
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)user));
1037 1038

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1039 1040
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1041
    tReq.msgIdx = -1;
D
dapan1121 已提交
1042
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1043
#else
D
dapan1121 已提交
1044 1045 1046 1047 1048
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1049

D
dapan1121 已提交
1050
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1051
#endif
D
dapan1121 已提交
1052
  }
dengyihao's avatar
dengyihao 已提交
1053

D
dapan1121 已提交
1054 1055
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1056
      .pCont = msg,
D
dapan1121 已提交
1057 1058 1059 1060
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1061
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1062 1063

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
D
dapan1121 已提交
1064 1065

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1066

D
dapan1121 已提交
1067 1068 1069
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1070
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
D
dapan1121 已提交
1071
                                  STableMetaOutput* out, SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
1072
  SCtgTask*        pTask = tReq ? tReq->pTask : NULL;
D
dapan1121 已提交
1073
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
dengyihao's avatar
dengyihao 已提交
1074 1075 1076 1077 1078
  char*            msg = NULL;
  SEpSet*          pVnodeEpSet = NULL;
  int32_t          msgLen = 0;
  int32_t          reqType = TDMT_MND_TABLE_META;
  char             tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1079
  sprintf(tbFName, "%s.%s", dbFName, tbName);
dengyihao's avatar
dengyihao 已提交
1080
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1081 1082 1083

  ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);

D
dapan1121 已提交
1084
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1085 1086 1087 1088 1089 1090 1091 1092 1093 1094
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1095

D
dapan1121 已提交
1096
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
dengyihao's avatar
dengyihao 已提交
1097

1098
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1099
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1100
#else
D
dapan1121 已提交
1101 1102 1103 1104 1105
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1106

D
dapan1121 已提交
1107
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1108
#endif
D
dapan1121 已提交
1109 1110 1111 1112
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1113
      .pCont = msg,
D
dapan1121 已提交
1114 1115 1116 1117
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1118
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
dengyihao's avatar
dengyihao 已提交
1119

D
dapan1121 已提交
1120 1121
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));

D
dapan1121 已提交
1122 1123
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1124 1125 1126
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1127
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
D
dapan1121 已提交
1128
                              SCtgTaskReq* tReq) {
D
dapan1121 已提交
1129 1130 1131
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
1132
  return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, tReq);
D
dapan1121 已提交
1133 1134
}

dengyihao's avatar
dengyihao 已提交
1135
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
D
dapan1121 已提交
1136
                              STableMetaOutput* out, SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
1137 1138
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
  char      dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1139 1140
  tNameGetFullDbName(pTableName, dbFName);
  int32_t reqType = TDMT_VND_TABLE_META;
dengyihao's avatar
dengyihao 已提交
1141
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1142
  sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
dengyihao's avatar
dengyihao 已提交
1143
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1144

D
dapan1121 已提交
1145
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1146 1147
  ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1148

dengyihao's avatar
dengyihao 已提交
1149 1150 1151
  SBuildTableInput bInput = {
      .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
  char*   msg = NULL;
D
dapan1121 已提交
1152 1153
  int32_t msgLen = 0;

D
dapan1121 已提交
1154
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164
  if (code) {
    ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1165

dengyihao's avatar
dengyihao 已提交
1166 1167 1168 1169
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1170

dengyihao's avatar
dengyihao 已提交
1171
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
D
dapan1121 已提交
1172

D
dapan1121 已提交
1173
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1174
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1175
#else
D
dapan1121 已提交
1176
    SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1177
    char           dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1178 1179 1180 1181 1182 1183 1184
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);

D
dapan1121 已提交
1185
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->vgId, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1186
#endif
D
dapan1121 已提交
1187 1188 1189 1190
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1191
      .pCont = msg,
D
dapan1121 已提交
1192 1193 1194 1195
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1196
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1197

dengyihao's avatar
dengyihao 已提交
1198
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
D
dapan1121 已提交
1199

D
dapan1121 已提交
1200 1201
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1202 1203 1204
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1205 1206 1207
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
                                SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1208 1209
  int32_t msgLen = 0;
  int32_t reqType = TDMT_VND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1210
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1211
  tNameExtractFullName(pTableName, tbFName);
dengyihao's avatar
dengyihao 已提交
1212
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1213 1214
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1215
  SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1216

D
dapan1121 已提交
1217
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1218 1219
  ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1220 1221 1222 1223 1224 1225 1226 1227

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1228
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1229

dengyihao's avatar
dengyihao 已提交
1230 1231 1232 1233
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1234
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1235 1236
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1237
    tReq.msgIdx = -1;
D
dapan1121 已提交
1238
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
D
dapan1121 已提交
1239 1240
#else
    SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1241
    char          dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1242 1243 1244 1245 1246 1247
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1248

dengyihao's avatar
dengyihao 已提交
1249 1250
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg,
                            msgLen));
dengyihao's avatar
dengyihao 已提交
1251
#endif
D
dapan1121 已提交
1252
  }
dengyihao's avatar
dengyihao 已提交
1253

D
dapan1121 已提交
1254 1255
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1256
      .pCont = msg,
D
dapan1121 已提交
1257 1258 1259 1260
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1261
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1262 1263

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1264 1265

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1266

D
dapan1121 已提交
1267 1268 1269
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1270 1271 1272
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
                                SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1273 1274
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1275
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1276
  tNameExtractFullName(pTableName, tbFName);
dengyihao's avatar
dengyihao 已提交
1277
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1278 1279
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1280
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1281 1282 1283 1284 1285 1286 1287 1288 1289 1290

  ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1291
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1292

1293
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1294 1295
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1296
    tReq.msgIdx = -1;
D
dapan1121 已提交
1297
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1298
#else
D
dapan1121 已提交
1299 1300 1301 1302 1303
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1304

D
dapan1121 已提交
1305
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1306
#endif
D
dapan1121 已提交
1307
  }
dengyihao's avatar
dengyihao 已提交
1308

D
dapan1121 已提交
1309 1310
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1311
      .pCont = msg,
D
dapan1121 已提交
1312 1313 1314 1315 1316 1317 1318
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1319 1320

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1321

D
dapan1121 已提交
1322 1323 1324
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1325 1326
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1327 1328
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_SERVER_VERSION;
dengyihao's avatar
dengyihao 已提交
1329
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1330 1331 1332 1333 1334 1335 1336 1337 1338 1339

  qDebug("try to get svr ver from mnode");

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get svr ver msg failed, code:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1340
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
1341 1342

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1343 1344
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1345
    tReq.msgIdx = -1;
D
dapan1121 已提交
1346
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1347
#else
D
dapan1121 已提交
1348 1349 1350 1351 1352
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1353

D
dapan1121 已提交
1354
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1355
#endif
D
dapan1121 已提交
1356
  }
dengyihao's avatar
dengyihao 已提交
1357

D
dapan1121 已提交
1358 1359
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1360
      .pCont = msg,
D
dapan1121 已提交
1361 1362 1363 1364 1365 1366 1367
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
D
dapan1121 已提交
1368 1369

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1370

D
dapan1121 已提交
1371 1372
  return TSDB_CODE_SUCCESS;
}