ctgRemote.c 43.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tname.h"
D
dapan1121 已提交
20
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
21
#include "trpc.h"
D
dapan1121 已提交
22
#include "ctgRemote.h"
D
dapan1121 已提交
23

dengyihao's avatar
dengyihao 已提交
24 25
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
26
  SCatalog* pCtg = pJob->pCtg;
D
dapan1121 已提交
27
  int32_t   taskNum = taosArrayGetSize(cbParam->taskId);
dengyihao's avatar
dengyihao 已提交
28
  SDataBuf  taskMsg = *pMsg;
D
dapan1121 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42
  int32_t msgNum = 0;
  SBatchRsp batchRsp = {0};
  SBatchRspMsg rsp = {0};
  SBatchRspMsg *pRsp = NULL;

  if (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) {
    if (tDeserializeSBatchRsp(pMsg->pData, pMsg->len, &batchRsp) < 0) {
      ctgError("tDeserializeSBatchRsp failed, msgLen:%d", pMsg->len);
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }

    msgNum = taosArrayGetSize(batchRsp.pRsps);
  }
  
43
  tAssert(taskNum == msgNum || 0 == msgNum);
D
dapan1121 已提交
44

dengyihao's avatar
dengyihao 已提交
45 46
  ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
           TMSG_INFO(cbParam->reqType + 1));
D
dapan1121 已提交
47 48 49 50

  SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pBatchs) {
    ctgError("taosHashInit %d batch failed", taskNum);
D
dapan1121 已提交
51
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
52
  }
dengyihao's avatar
dengyihao 已提交
53

D
dapan1121 已提交
54
  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
55 56
    int32_t*  taskId = taosArrayGet(cbParam->taskId, i);
    int32_t*  msgIdx = taosArrayGet(cbParam->msgIdx, i);
dengyihao's avatar
dengyihao 已提交
57
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
58
    if (msgNum > 0) {
D
dapan1121 已提交
59 60 61 62 63 64
      pRsp = taosArrayGet(batchRsp.pRsps, i);

      taskMsg.msgType = pRsp->reqType;
      taskMsg.pData = pRsp->msg;
      taskMsg.len = pRsp->msgLen;

65
      tAssert(pRsp->msgIdx == *msgIdx);
D
dapan1121 已提交
66
    } else {
D
dapan1121 已提交
67 68 69 70
      pRsp = &rsp;
      pRsp->msgIdx = *msgIdx;
      pRsp->reqType = -1;
      pRsp->rspCode = 0;
D
dapan1121 已提交
71 72 73 74
      taskMsg.msgType = -1;
      taskMsg.pData = NULL;
      taskMsg.len = 0;
    }
dengyihao's avatar
dengyihao 已提交
75

D
dapan1121 已提交
76 77
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
D
dapan1121 已提交
78
    tReq.msgIdx = pRsp->msgIdx;
D
dapan1121 已提交
79 80
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq.msgIdx);
    pMsgCtx->pBatchs = pBatchs;
D
dapan1121 已提交
81

dengyihao's avatar
dengyihao 已提交
82
    ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
D
dapan1121 已提交
83
             pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
D
dapan1121 已提交
84

D
dapan1121 已提交
85
    (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, pRsp->reqType, &taskMsg, (pRsp->rspCode ? pRsp->rspCode : rspCode));
D
dapan1121 已提交
86 87 88 89 90 91
  }

  CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));

_return:

D
dapan1121 已提交
92 93
  taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);

D
dapan1121 已提交
94 95 96 97
  ctgFreeBatchs(pBatchs);
  CTG_RET(code);
}

D
dapan1121 已提交
98 99
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
100

D
dapan1121 已提交
101 102 103 104 105 106
  switch (reqType) {
    case TDMT_MND_QNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
107

D
dapan1121 已提交
108 109 110 111 112
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
113

D
dapan1121 已提交
114 115 116
      qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
      break;
    }
D
dapan1121 已提交
117 118 119 120 121
    case TDMT_MND_DNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
122

D
dapan1121 已提交
123 124 125 126 127
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
128

D
dapan1121 已提交
129 130 131
      qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
      break;
    }
D
dapan1121 已提交
132 133 134 135 136
    case TDMT_MND_USE_DB: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
137

D
dapan1121 已提交
138 139 140 141 142
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
143

D
dapan1121 已提交
144 145 146 147 148 149 150 151
      qDebug("Got db vgInfo from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_DB_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
152

D
dapan1121 已提交
153 154 155 156 157
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
158

D
dapan1121 已提交
159 160 161 162 163 164 165 166
      qDebug("Got db cfg from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
167

D
dapan1121 已提交
168 169 170 171 172
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
173

D
dapan1121 已提交
174 175 176
      qDebug("Got index from mnode, indexName:%s", target);
      break;
    }
D
dapan1121 已提交
177 178 179 180 181
    case TDMT_MND_GET_TABLE_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
182

D
dapan1121 已提交
183 184 185 186 187
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
188

D
dapan1121 已提交
189 190 191
      qDebug("Got table index from mnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
192 193 194 195 196
    case TDMT_MND_RETRIEVE_FUNC: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
197

D
dapan1121 已提交
198 199 200 201 202
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
203

D
dapan1121 已提交
204 205 206 207 208 209 210 211
      qDebug("Got udf from mnode, funcName:%s", target);
      break;
    }
    case TDMT_MND_GET_USER_AUTH: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
212

D
dapan1121 已提交
213 214 215 216 217
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
218

D
dapan1121 已提交
219 220 221 222 223 224 225 226 227 228
      qDebug("Got user auth from mnode, user:%s", target);
      break;
    }
    case TDMT_MND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("stablemeta not exist in mnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
229

D
dapan1121 已提交
230 231 232
        qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
233

D
dapan1121 已提交
234 235 236 237 238
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
239

D
dapan1121 已提交
240 241 242 243 244 245 246 247 248 249
      qDebug("Got table meta from mnode, tbFName:%s", target);
      break;
    }
    case TDMT_VND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("tablemeta not exist in vnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
250

D
dapan1121 已提交
251 252 253
        qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
254

D
dapan1121 已提交
255 256 257 258 259
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
260

D
dapan1121 已提交
261 262 263
      qDebug("Got table meta from vnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
264 265 266 267 268
    case TDMT_VND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
269

D
dapan1121 已提交
270 271 272 273 274
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
275

D
dapan1121 已提交
276 277 278 279 280 281 282 283
      qDebug("Got table cfg from vnode, tbFName:%s", target);
      break;
    }
    case TDMT_MND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
284

D
dapan1121 已提交
285 286 287 288 289
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
290

D
dapan1121 已提交
291 292
      qDebug("Got stb cfg from mnode, tbFName:%s", target);
      break;
dengyihao's avatar
dengyihao 已提交
293
    }
D
dapan1121 已提交
294 295 296 297 298
    case TDMT_MND_SERVER_VERSION: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
299

D
dapan1121 已提交
300 301 302 303 304
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process svr ver rsp failed, error:%s", tstrerror(code));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
305

D
dapan1121 已提交
306 307 308
      qDebug("Got svr ver from mnode");
      break;
    }
D
dapan1121 已提交
309
    default:
D
dapan1121 已提交
310 311 312 313
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("Got error rsp, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
314

D
dapan1121 已提交
315 316
      qError("invalid req type %s", TMSG_INFO(reqType));
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
317 318 319 320 321
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
322
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
D
dapan1121 已提交
323
  SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
dengyihao's avatar
dengyihao 已提交
324 325 326
  int32_t                code = 0;
  SCtgJob*               pJob = NULL;

D
dapan1121 已提交
327
  CTG_API_JENTER();
D
dapan1121 已提交
328

D
dapan1121 已提交
329
  pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
D
dapan1121 已提交
330
  if (NULL == pJob) {
D
dapan1121 已提交
331
    qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
D
dapan1121 已提交
332 333 334
    goto _return;
  }

335 336
  SCatalog* pCtg = pJob->pCtg;

D
dapan1121 已提交
337 338 339
  if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
    CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
  } else {
dengyihao's avatar
dengyihao 已提交
340 341
    int32_t*  taskId = taosArrayGet(cbParam->taskId, 0);
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
342

dengyihao's avatar
dengyihao 已提交
343 344
    qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
           TMSG_INFO(cbParam->reqType + 1));
345 346

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
347 348
    SHashObj* pBatchs =
        taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
349 350
    if (NULL == pBatchs) {
      ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
351
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
352
    }
D
dapan1121 已提交
353

dengyihao's avatar
dengyihao 已提交
354
    SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
D
dapan1121 已提交
355
    pMsgCtx->pBatchs = pBatchs;
356 357
#endif

D
dapan1121 已提交
358 359 360
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;
dengyihao's avatar
dengyihao 已提交
361

D
dapan1121 已提交
362
    CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
363 364

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
365 366
    CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
#endif
D
dapan1121 已提交
367
  }
dengyihao's avatar
dengyihao 已提交
368

D
dapan1121 已提交
369 370
_return:

D
dapan1121 已提交
371
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
372
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
373

D
dapan1121 已提交
374 375 376 377 378 379 380
  if (pJob) {
    taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
  }

  CTG_API_LEAVE(code);
}

D
dapan1121 已提交
381
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, SArray* pMsgIdx, int32_t msgType,
dengyihao's avatar
dengyihao 已提交
382
                           SMsgSendInfo** pMsgSendInfo) {
D
dapan1121 已提交
383
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
384
  SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
385 386
  if (NULL == msgSendInfo) {
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
S
Shengliang Guan 已提交
387
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
388 389
  }

dengyihao's avatar
dengyihao 已提交
390
  SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
D
dapan1121 已提交
391 392
  if (NULL == param) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
S
Shengliang Guan 已提交
393
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
394 395 396
  }

  param->reqType = msgType;
D
dapan1121 已提交
397 398 399 400
  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = pTaskId;
  param->batchId = batchId;
D
dapan1121 已提交
401
  param->msgIdx = pMsgIdx;
D
dapan1121 已提交
402 403

  msgSendInfo->param = param;
404
  msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
D
dapan1121 已提交
405 406 407 408 409 410 411 412
  msgSendInfo->fp = ctgHandleMsgCallback;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

413 414
  taosArrayDestroy(pTaskId);
  destroySendMsgInfo(msgSendInfo);
D
dapan1121 已提交
415 416 417 418

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
419
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
D
dapan1121 已提交
420
                        SArray* pMsgIdx, char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
dengyihao's avatar
dengyihao 已提交
421 422
  int32_t       code = 0;
  SMsgSendInfo* pMsgSendInfo = NULL;
D
dapan1121 已提交
423
  CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, pMsgIdx, msgType, &pMsgSendInfo));
D
dapan1121 已提交
424

D
dapan1121 已提交
425
  ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
D
dapan1121 已提交
426 427 428

  pMsgSendInfo->requestId = pConn->requestId;
  pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
D
dapan1121 已提交
429 430 431 432 433 434
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = NULL;
  pMsgSendInfo->msgType = msgType;

  int64_t transporterId = 0;
D
dapan1121 已提交
435
  code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
436
  pMsgSendInfo = NULL;
D
dapan1121 已提交
437 438 439 440 441
  if (code) {
    ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
    CTG_ERR_JRET(code);
  }

D
dapan1121 已提交
442
  ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
443 444 445 446 447
  return TSDB_CODE_SUCCESS;

_return:

  if (pMsgSendInfo) {
D
dapan1121 已提交
448 449 450 451 452 453
    destroySendMsgInfo(pMsgSendInfo);
  }

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
454 455 456 457
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType,
                    void* msg, uint32_t msgSize) {
  int32_t     code = 0;
  SCtgTask*   pTask = tReq->pTask;
D
dapan1121 已提交
458
  SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
dengyihao's avatar
dengyihao 已提交
459 460 461 462 463 464
  SHashObj*   pBatchs = pMsgCtx->pBatchs;
  SCtgJob*    pJob = pTask->pJob;
  SCtgBatch*  pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
  SCtgBatch   newBatch = {0};
  SBatchMsg   req = {0};

D
dapan1121 已提交
465
  if (NULL == pBatch) {
D
dapan1121 已提交
466 467
    newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
    newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
D
dapan1121 已提交
468 469
    newBatch.pMsgIdxs = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
    if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds || NULL == newBatch.pMsgIdxs) {
D
dapan1121 已提交
470 471
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
472

473
    newBatch.conn = *pConn;
D
dapan1121 已提交
474

D
dapan1121 已提交
475
    req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
476 477 478 479 480 481
    req.msgType = msgType;
    req.msgLen = msgSize;
    req.msg = msg;
    if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
482
    msg = NULL;
D
dapan1121 已提交
483 484 485
    if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
486 487 488
    if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
489 490

    if (vgId > 0) {
D
dapan1121 已提交
491
      SName* pName = NULL;
D
dapan1121 已提交
492 493
      if (TDMT_VND_TABLE_CFG == msgType) {
        SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
494
        pName = ctx->pName;
D
dapan1121 已提交
495
      } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
496
        if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
497
          SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
498
          SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
499
          pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
500 501 502 503
        } else {
          SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
          pName = ctx->pName;
        }
D
dapan1121 已提交
504 505 506 507
      } else {
        ctgError("invalid vnode msgType %d", msgType);
        CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
      }
D
dapan1121 已提交
508 509

      tNameGetFullDbName(pName, newBatch.dbFName);
D
dapan1121 已提交
510 511 512 513 514 515 516 517 518
    }

    newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
    newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);

    if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

dengyihao's avatar
dengyihao 已提交
519 520
    ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
             vgId);
D
dapan1121 已提交
521 522 523 524

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
525
  req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
526 527 528 529 530 531
  req.msgType = msgType;
  req.msgLen = msgSize;
  req.msg = msg;
  if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
532
  msg = NULL;
D
dapan1121 已提交
533 534 535
  if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
536 537 538
  if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
539

D
dapan1121 已提交
540
  if (vgId > 0) {
D
dapan1121 已提交
541
    SName* pName = NULL;
D
dapan1121 已提交
542 543
    if (TDMT_VND_TABLE_CFG == msgType) {
      SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
544
      pName = ctx->pName;
D
dapan1121 已提交
545
    } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
546
      if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
547
        SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
548
        SCtgFetch*      fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
549
        pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
550 551 552 553
      } else {
        SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
        pName = ctx->pName;
      }
D
dapan1121 已提交
554 555 556 557
    } else {
      ctgError("invalid vnode msgType %d", msgType);
      CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
    }
D
dapan1121 已提交
558

dengyihao's avatar
dengyihao 已提交
559
    tNameGetFullDbName(pName, pBatch->dbFName);
D
dapan1121 已提交
560 561
  }

dengyihao's avatar
dengyihao 已提交
562 563
  ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
           vgId);
D
dapan1121 已提交
564 565 566 567 568 569 570

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeBatch(&newBatch);
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
571

D
dapan1121 已提交
572 573 574
  return code;
}

D
dapan1121 已提交
575
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg, int32_t *pSize) {
dengyihao's avatar
dengyihao 已提交
576
  int32_t    num = taosArrayGetSize(pBatch->pMsgs);
D
dapan1121 已提交
577 578 579 580 581
  if (num >= CTG_MAX_REQ_IN_BATCH) {
    qError("too many msgs %d in one batch request", num);
    CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
  }

D
dapan1121 已提交
582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600
  SBatchReq batchReq = {0};

  batchReq.header.vgId = vgId;
  batchReq.pMsgs = pBatch->pMsgs;

  int32_t msgSize = tSerializeSBatchReq(NULL, 0, &batchReq);
  if (msgSize < 0) {
    qError("tSerializeSBatchReq failed");
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
  
  *msg = taosMemoryCalloc(1, msgSize);
  if (NULL == (*msg)) {
    qError("calloc batchReq msg failed, size:%d", msgSize);
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
  if (tSerializeSBatchReq(*msg, msgSize, &batchReq) < 0) {
    qError("tSerializeSBatchReq failed");
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
D
dapan1121 已提交
601 602
  }

D
dapan1121 已提交
603
  *pSize = msgSize;
D
dapan1121 已提交
604

605 606
  qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);

D
dapan1121 已提交
607 608 609
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
610
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
D
dapan1121 已提交
611
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
612 613
  void*   msg = NULL;
  void*   p = taosHashIterate(pBatchs, NULL);
D
dapan1121 已提交
614
  while (NULL != p) {
dengyihao's avatar
dengyihao 已提交
615 616
    size_t     len = 0;
    int32_t*   vgId = taosHashGetKey(p, &len);
D
dapan1121 已提交
617
    SCtgBatch* pBatch = (SCtgBatch*)p;
D
dapan1121 已提交
618
    int32_t msgSize = 0;
619 620

    ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
dengyihao's avatar
dengyihao 已提交
621

D
dapan1121 已提交
622
    CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize));
dengyihao's avatar
dengyihao 已提交
623
    code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
D
dapan1121 已提交
624
                           pBatch->dbFName, *vgId, pBatch->msgType, msg, msgSize);
625 626
    pBatch->pTaskIds = NULL;
    CTG_ERR_JRET(code);
dengyihao's avatar
dengyihao 已提交
627

D
dapan1121 已提交
628 629 630 631 632 633 634 635 636 637 638
    p = taosHashIterate(pBatchs, p);
  }

  return TSDB_CODE_SUCCESS;

_return:

  if (p) {
    taosHashCancelIterate(pBatchs, p);
  }
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
639

D
dapan1121 已提交
640 641 642
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
643 644
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
645 646
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_QNODE_LIST;
wafwerar's avatar
wafwerar 已提交
647
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
648

D
dapan1121 已提交
649
  ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
D
dapan1121 已提交
650

D
dapan1121 已提交
651
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
652 653 654 655 656
  if (code) {
    ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
657 658 659 660 661
  if (pTask) {
    void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
662

D
dapan1121 已提交
663
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, NULL));
D
dapan1121 已提交
664

665
#if CTG_BATCH_FETCH
D
dapan1121 已提交
666 667
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
668
    tReq.msgIdx = -1;
D
dapan1121 已提交
669
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
670
#else
D
dapan1121 已提交
671 672 673 674 675
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
676

D
dapan1121 已提交
677
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
678
#endif
D
dapan1121 已提交
679
  }
dengyihao's avatar
dengyihao 已提交
680

D
dapan1121 已提交
681 682
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
683
      .pCont = msg,
D
dapan1121 已提交
684 685 686 687 688 689 690 691
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
692 693
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
694 695 696
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
697 698
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
699 700
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_DNODE_LIST;
wafwerar's avatar
wafwerar 已提交
701
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
702 703 704 705 706 707 708 709 710

  ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
711
  if (pTask) {
D
dapan1121 已提交
712
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
D
dapan1121 已提交
713

714
#if CTG_BATCH_FETCH
D
dapan1121 已提交
715 716
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
717
    tReq.msgIdx = -1;
D
dapan1121 已提交
718
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
719
#else
D
dapan1121 已提交
720 721 722 723 724
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
725

D
dapan1121 已提交
726
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
727
#endif
D
dapan1121 已提交
728
  }
dengyihao's avatar
dengyihao 已提交
729

D
dapan1121 已提交
730 731
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
732
      .pCont = msg,
D
dapan1121 已提交
733 734 735 736
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
737
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
738 739 740

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
741 742
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
743 744 745
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
746
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
D
dapan1121 已提交
747
                                SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
748 749 750
  char*     msg = NULL;
  int32_t   msgLen = 0;
  int32_t   reqType = TDMT_MND_USE_DB;
D
dapan1121 已提交
751
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
wafwerar's avatar
wafwerar 已提交
752
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
753 754 755

  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
756
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
757 758 759 760 761 762 763 764 765 766
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
767

D
dapan1121 已提交
768
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, input->db));
D
dapan1121 已提交
769

770
#if CTG_BATCH_FETCH
D
dapan1121 已提交
771
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
772
#else
D
dapan1121 已提交
773 774 775 776 777
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
778

D
dapan1121 已提交
779
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
780
#endif
D
dapan1121 已提交
781
  }
dengyihao's avatar
dengyihao 已提交
782

D
dapan1121 已提交
783 784
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
785
      .pCont = msg,
D
dapan1121 已提交
786 787 788 789
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
790
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
791 792

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
dengyihao's avatar
dengyihao 已提交
793

D
dapan1121 已提交
794
  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
795

D
dapan1121 已提交
796 797 798
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
799 800 801
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
                             SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
802 803
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_DB_CFG;
wafwerar's avatar
wafwerar 已提交
804
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
805 806 807

  ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);

dengyihao's avatar
dengyihao 已提交
808
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
809 810 811 812 813 814 815 816 817 818
  if (code) {
    ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
819

D
dapan1121 已提交
820
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)dbFName));
D
dapan1121 已提交
821

822
#if CTG_BATCH_FETCH
D
dapan1121 已提交
823 824
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
825
    tReq.msgIdx = -1;
D
dapan1121 已提交
826
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
827
#else
D
dapan1121 已提交
828 829 830 831 832
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
833

D
dapan1121 已提交
834
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
835
#endif
D
dapan1121 已提交
836
  }
dengyihao's avatar
dengyihao 已提交
837

D
dapan1121 已提交
838 839
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_GET_DB_CFG,
dengyihao's avatar
dengyihao 已提交
840
      .pCont = msg,
D
dapan1121 已提交
841 842 843 844
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
845
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
846 847 848

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));

D
dapan1121 已提交
849 850
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
851 852 853
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
854 855 856
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
                                 SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
857 858
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_INDEX;
wafwerar's avatar
wafwerar 已提交
859
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
860 861 862

  ctgDebug("try to get index from mnode, indexName:%s", indexName);

dengyihao's avatar
dengyihao 已提交
863
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
864 865 866 867 868 869 870 871 872 873
  if (code) {
    ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SIndexInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
874

D
dapan1121 已提交
875
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)indexName));
876 877

#if CTG_BATCH_FETCH
D
dapan1121 已提交
878 879
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
880
    tReq.msgIdx = -1;
D
dapan1121 已提交
881
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
882
#else
D
dapan1121 已提交
883 884 885 886 887
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
888

D
dapan1121 已提交
889
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
890
#endif
D
dapan1121 已提交
891
  }
dengyihao's avatar
dengyihao 已提交
892

D
dapan1121 已提交
893 894
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
895
      .pCont = msg,
D
dapan1121 已提交
896 897 898 899
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
900
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
901 902

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
D
dapan1121 已提交
903 904

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
905

D
dapan1121 已提交
906 907 908
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
909 910 911
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
912 913
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
wafwerar's avatar
wafwerar 已提交
914
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
915 916
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(name, tbFName);
D
dapan1121 已提交
917 918 919

  ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);

dengyihao's avatar
dengyihao 已提交
920
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
921 922 923 924 925 926
  if (code) {
    ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
927 928 929 930
    void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
931

D
dapan1121 已提交
932
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
933 934

#if CTG_BATCH_FETCH
D
dapan1121 已提交
935 936
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
937
    tReq.msgIdx = -1;
D
dapan1121 已提交
938
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
939
#else
D
dapan1121 已提交
940 941 942 943 944
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
945

D
dapan1121 已提交
946
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
947
#endif
D
dapan1121 已提交
948
  }
dengyihao's avatar
dengyihao 已提交
949

D
dapan1121 已提交
950 951
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
952
      .pCont = msg,
D
dapan1121 已提交
953 954 955 956
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
957
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
958 959

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
960 961

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
962

D
dapan1121 已提交
963 964 965
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
966 967 968
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
969 970
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
wafwerar's avatar
wafwerar 已提交
971
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
972 973 974

  ctgDebug("try to get udf info from mnode, funcName:%s", funcName);

dengyihao's avatar
dengyihao 已提交
975
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
976 977 978 979 980 981 982 983 984 985
  if (code) {
    ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SFuncInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
986

D
dapan1121 已提交
987
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)funcName));
988 989

#if CTG_BATCH_FETCH
D
dapan1121 已提交
990 991
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
992
    tReq.msgIdx = -1;
D
dapan1121 已提交
993
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
994
#else
D
dapan1121 已提交
995 996 997 998 999
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1000

D
dapan1121 已提交
1001
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1002
#endif
D
dapan1121 已提交
1003
  }
dengyihao's avatar
dengyihao 已提交
1004

D
dapan1121 已提交
1005 1006
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1007
      .pCont = msg,
D
dapan1121 已提交
1008 1009 1010 1011
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1012
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1013 1014 1015

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));

D
dapan1121 已提交
1016 1017
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1018 1019 1020
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1021 1022 1023
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
                                  SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1024 1025
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_USER_AUTH;
wafwerar's avatar
wafwerar 已提交
1026
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1027 1028 1029

  ctgDebug("try to get user auth from mnode, user:%s", user);

dengyihao's avatar
dengyihao 已提交
1030
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1031 1032 1033 1034 1035 1036 1037 1038 1039 1040
  if (code) {
    ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1041

D
dapan1121 已提交
1042
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)user));
1043 1044

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1045 1046
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1047
    tReq.msgIdx = -1;
D
dapan1121 已提交
1048
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1049
#else
D
dapan1121 已提交
1050 1051 1052 1053 1054
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1055

D
dapan1121 已提交
1056
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1057
#endif
D
dapan1121 已提交
1058
  }
dengyihao's avatar
dengyihao 已提交
1059

D
dapan1121 已提交
1060 1061
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1062
      .pCont = msg,
D
dapan1121 已提交
1063 1064 1065 1066
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1067
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1068 1069

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
D
dapan1121 已提交
1070 1071

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1072

D
dapan1121 已提交
1073 1074 1075
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1076
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
D
dapan1121 已提交
1077
                                  STableMetaOutput* out, SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
1078
  SCtgTask*        pTask = tReq ? tReq->pTask : NULL;
D
dapan1121 已提交
1079
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
dengyihao's avatar
dengyihao 已提交
1080 1081 1082 1083 1084
  char*            msg = NULL;
  SEpSet*          pVnodeEpSet = NULL;
  int32_t          msgLen = 0;
  int32_t          reqType = TDMT_MND_TABLE_META;
  char             tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1085
  sprintf(tbFName, "%s.%s", dbFName, tbName);
wafwerar's avatar
wafwerar 已提交
1086
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1087 1088 1089

  ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);

D
dapan1121 已提交
1090
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1101

D
dapan1121 已提交
1102
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
dengyihao's avatar
dengyihao 已提交
1103

1104
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1105
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1106
#else
D
dapan1121 已提交
1107 1108 1109 1110 1111
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1112

D
dapan1121 已提交
1113
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1114
#endif
D
dapan1121 已提交
1115 1116 1117 1118
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1119
      .pCont = msg,
D
dapan1121 已提交
1120 1121 1122 1123
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1124
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
dengyihao's avatar
dengyihao 已提交
1125

D
dapan1121 已提交
1126 1127
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));

D
dapan1121 已提交
1128 1129
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1130 1131 1132
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1133
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
D
dapan1121 已提交
1134
                              SCtgTaskReq* tReq) {
D
dapan1121 已提交
1135 1136 1137
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
1138
  return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, tReq);
D
dapan1121 已提交
1139 1140
}

dengyihao's avatar
dengyihao 已提交
1141
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
D
dapan1121 已提交
1142
                              STableMetaOutput* out, SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
1143 1144
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
  char      dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1145 1146
  tNameGetFullDbName(pTableName, dbFName);
  int32_t reqType = TDMT_VND_TABLE_META;
dengyihao's avatar
dengyihao 已提交
1147
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1148
  sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
wafwerar's avatar
wafwerar 已提交
1149
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1150

D
dapan1121 已提交
1151
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1152 1153
  ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1154

dengyihao's avatar
dengyihao 已提交
1155 1156 1157
  SBuildTableInput bInput = {
      .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
  char*   msg = NULL;
D
dapan1121 已提交
1158 1159
  int32_t msgLen = 0;

D
dapan1121 已提交
1160
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1161 1162 1163 1164 1165 1166 1167 1168 1169 1170
  if (code) {
    ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1171

dengyihao's avatar
dengyihao 已提交
1172 1173 1174 1175
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1176

dengyihao's avatar
dengyihao 已提交
1177
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
D
dapan1121 已提交
1178

D
dapan1121 已提交
1179
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1180
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1181
#else
D
dapan1121 已提交
1182
    SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1183
    char           dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1184 1185 1186 1187 1188 1189 1190
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);

D
dapan1121 已提交
1191
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->vgId, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1192
#endif
D
dapan1121 已提交
1193 1194 1195 1196
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1197
      .pCont = msg,
D
dapan1121 已提交
1198 1199 1200 1201
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1202
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1203

dengyihao's avatar
dengyihao 已提交
1204
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
D
dapan1121 已提交
1205

D
dapan1121 已提交
1206 1207
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1208 1209 1210
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1211 1212 1213
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
                                SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1214 1215
  int32_t msgLen = 0;
  int32_t reqType = TDMT_VND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1216
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1217
  tNameExtractFullName(pTableName, tbFName);
wafwerar's avatar
wafwerar 已提交
1218
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1219 1220
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1221
  SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1222

D
dapan1121 已提交
1223
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1224 1225
  ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1226 1227 1228 1229 1230 1231 1232 1233

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1234
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1235

dengyihao's avatar
dengyihao 已提交
1236 1237 1238 1239
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1240
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1241 1242
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1243
    tReq.msgIdx = -1;
D
dapan1121 已提交
1244
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
D
dapan1121 已提交
1245 1246
#else
    SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1247
    char          dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1248 1249 1250 1251 1252 1253
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1254

dengyihao's avatar
dengyihao 已提交
1255 1256
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg,
                            msgLen));
dengyihao's avatar
dengyihao 已提交
1257
#endif
D
dapan1121 已提交
1258
  }
dengyihao's avatar
dengyihao 已提交
1259

D
dapan1121 已提交
1260 1261
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1262
      .pCont = msg,
D
dapan1121 已提交
1263 1264 1265 1266
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1267
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1268 1269

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1270 1271

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1272

D
dapan1121 已提交
1273 1274 1275
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1276 1277 1278
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
                                SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1279 1280
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1281
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1282
  tNameExtractFullName(pTableName, tbFName);
wafwerar's avatar
wafwerar 已提交
1283
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1284 1285
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1286
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1287 1288 1289 1290 1291 1292 1293 1294 1295 1296

  ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1297
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1298

1299
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1300 1301
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1302
    tReq.msgIdx = -1;
D
dapan1121 已提交
1303
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1304
#else
D
dapan1121 已提交
1305 1306 1307 1308 1309
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1310

D
dapan1121 已提交
1311
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1312
#endif
D
dapan1121 已提交
1313
  }
dengyihao's avatar
dengyihao 已提交
1314

D
dapan1121 已提交
1315 1316
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1317
      .pCont = msg,
D
dapan1121 已提交
1318 1319 1320 1321 1322 1323 1324
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1325 1326

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1327

D
dapan1121 已提交
1328 1329 1330
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1331 1332
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1333 1334
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_SERVER_VERSION;
wafwerar's avatar
wafwerar 已提交
1335
  void* (*mallocFp)(int64_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1336 1337 1338 1339 1340 1341 1342 1343 1344 1345

  qDebug("try to get svr ver from mnode");

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get svr ver msg failed, code:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1346
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
1347 1348

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1349 1350
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
dengyihao's avatar
dengyihao 已提交
1351
    tReq.msgIdx = -1;
D
dapan1121 已提交
1352
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1353
#else
D
dapan1121 已提交
1354 1355 1356 1357 1358
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1359

D
dapan1121 已提交
1360
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1361
#endif
D
dapan1121 已提交
1362
  }
dengyihao's avatar
dengyihao 已提交
1363

D
dapan1121 已提交
1364 1365
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1366
      .pCont = msg,
D
dapan1121 已提交
1367 1368 1369 1370 1371 1372 1373
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
D
dapan1121 已提交
1374 1375

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1376

D
dapan1121 已提交
1377 1378
  return TSDB_CODE_SUCCESS;
}