ctgRemote.c 43.3 KB
Newer Older
D
dapan1121 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

#include "catalogInt.h"
dengyihao's avatar
dengyihao 已提交
17
#include "query.h"
D
dapan1121 已提交
18
#include "systable.h"
dengyihao's avatar
dengyihao 已提交
19
#include "tname.h"
D
dapan1121 已提交
20
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
21
#include "trpc.h"
D
dapan1121 已提交
22

dengyihao's avatar
dengyihao 已提交
23 24
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
  int32_t   code = 0;
25
  SCatalog* pCtg = pJob->pCtg;
D
dapan1121 已提交
26
  int32_t   taskNum = taosArrayGetSize(cbParam->taskId);
dengyihao's avatar
dengyihao 已提交
27 28
  SDataBuf  taskMsg = *pMsg;
  int32_t   offset = 0;
D
dapan1121 已提交
29
  int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
D
dapan1121 已提交
30 31
  ASSERT(taskNum == msgNum || 0 == msgNum);

dengyihao's avatar
dengyihao 已提交
32 33
  ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
           TMSG_INFO(cbParam->reqType + 1));
D
dapan1121 已提交
34 35 36 37 38 39 40 41

  offset += sizeof(msgNum);
  SBatchRsp rsp = {0};
  SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
  if (NULL == pBatchs) {
    ctgError("taosHashInit %d batch failed", taskNum);
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }
dengyihao's avatar
dengyihao 已提交
42

D
dapan1121 已提交
43
  for (int32_t i = 0; i < taskNum; ++i) {
D
dapan1121 已提交
44 45
    int32_t*  taskId = taosArrayGet(cbParam->taskId, i);
    int32_t*  msgIdx = taosArrayGet(cbParam->msgIdx, i);
dengyihao's avatar
dengyihao 已提交
46
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
47
    if (msgNum > 0) {
48
      rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
D
dapan1121 已提交
49
      offset += sizeof(rsp.reqType);
D
dapan1121 已提交
50 51
      rsp.msgIdx = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
      offset += sizeof(rsp.msgIdx);
52
      rsp.msgLen = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
D
dapan1121 已提交
53
      offset += sizeof(rsp.msgLen);
54
      rsp.rspCode = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
D
dapan1121 已提交
55 56 57
      offset += sizeof(rsp.rspCode);
      rsp.msg = ((char*)pMsg->pData) + offset;
      offset += rsp.msgLen;
dengyihao's avatar
dengyihao 已提交
58

D
dapan1121 已提交
59 60 61
      taskMsg.msgType = rsp.reqType;
      taskMsg.pData = rsp.msg;
      taskMsg.len = rsp.msgLen;
D
dapan1121 已提交
62 63

      ASSERT(rsp.msgIdx == *msgIdx);
D
dapan1121 已提交
64
    } else {
D
dapan1121 已提交
65
      rsp.msgIdx = *msgIdx;
D
dapan1121 已提交
66 67 68 69 70 71 72 73
      rsp.reqType = -1;
      taskMsg.msgType = -1;
      taskMsg.pData = NULL;
      taskMsg.len = 0;
    }

    pTask->pBatchs = pBatchs;
    
D
dapan1121 已提交
74 75 76 77 78 79 80
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = rsp.msgIdx;    

    ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1));

    (*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
D
dapan1121 已提交
81 82 83 84 85 86 87 88 89 90
  }

  CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));

_return:

  ctgFreeBatchs(pBatchs);
  CTG_RET(code);
}

D
dapan1121 已提交
91 92
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target) {
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
93

D
dapan1121 已提交
94 95 96 97 98 99
  switch (reqType) {
    case TDMT_MND_QNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for qnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
100

D
dapan1121 已提交
101 102 103 104 105
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process qnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
106

D
dapan1121 已提交
107 108 109
      qDebug("Got qnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(out));
      break;
    }
D
dapan1121 已提交
110 111 112 113 114
    case TDMT_MND_DNODE_LIST: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for dnode list, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
115

D
dapan1121 已提交
116 117 118 119 120
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process dnode list rsp failed, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
121

D
dapan1121 已提交
122 123 124
      qDebug("Got dnode list from mnode, listNum:%d", (int32_t)taosArrayGetSize(*(SArray**)out));
      break;
    }
D
dapan1121 已提交
125 126 127 128 129
    case TDMT_MND_USE_DB: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for use db, error:%s, dbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
130

D
dapan1121 已提交
131 132 133 134 135
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process use db rsp failed, error:%s, dbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
136

D
dapan1121 已提交
137 138 139 140 141 142 143 144
      qDebug("Got db vgInfo from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_DB_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get db cfg, error:%s, db:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
145

D
dapan1121 已提交
146 147 148 149 150
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get db cfg rsp failed, error:%s, db:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
151

D
dapan1121 已提交
152 153 154 155 156 157 158 159
      qDebug("Got db cfg from mnode, dbFName:%s", target);
      break;
    }
    case TDMT_MND_GET_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get index, error:%s, indexName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
160

D
dapan1121 已提交
161 162 163 164 165
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get index rsp failed, error:%s, indexName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
166

D
dapan1121 已提交
167 168 169
      qDebug("Got index from mnode, indexName:%s", target);
      break;
    }
D
dapan1121 已提交
170 171 172 173 174
    case TDMT_MND_GET_TABLE_INDEX: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get table index, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
175

D
dapan1121 已提交
176 177 178 179 180
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get table index rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
181

D
dapan1121 已提交
182 183 184
      qDebug("Got table index from mnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
185 186 187 188 189
    case TDMT_MND_RETRIEVE_FUNC: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get udf, error:%s, funcName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
190

D
dapan1121 已提交
191 192 193 194 195
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get udf rsp failed, error:%s, funcName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
196

D
dapan1121 已提交
197 198 199 200 201 202 203 204
      qDebug("Got udf from mnode, funcName:%s", target);
      break;
    }
    case TDMT_MND_GET_USER_AUTH: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for get user auth, error:%s, user:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
205

D
dapan1121 已提交
206 207 208 209 210
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process get user auth rsp failed, error:%s, user:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
211

D
dapan1121 已提交
212 213 214 215 216 217 218 219 220 221
      qDebug("Got user auth from mnode, user:%s", target);
      break;
    }
    case TDMT_MND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("stablemeta not exist in mnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
222

D
dapan1121 已提交
223 224 225
        qError("error rsp for stablemeta from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
226

D
dapan1121 已提交
227 228 229 230 231
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stablemeta rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
232

D
dapan1121 已提交
233 234 235 236 237 238 239 240 241 242
      qDebug("Got table meta from mnode, tbFName:%s", target);
      break;
    }
    case TDMT_VND_TABLE_META: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        if (CTG_TABLE_NOT_EXIST(rspCode)) {
          SET_META_TYPE_NULL(((STableMetaOutput*)out)->metaType);
          qDebug("tablemeta not exist in vnode, tbFName:%s", target);
          return TSDB_CODE_SUCCESS;
        }
dengyihao's avatar
dengyihao 已提交
243

D
dapan1121 已提交
244 245 246
        qError("error rsp for table meta from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
247

D
dapan1121 已提交
248 249 250 251 252
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tablemeta rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
253

D
dapan1121 已提交
254 255 256
      qDebug("Got table meta from vnode, tbFName:%s", target);
      break;
    }
D
dapan1121 已提交
257 258 259 260 261
    case TDMT_VND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for table cfg from vnode, code:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
262

D
dapan1121 已提交
263 264 265 266 267
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process vnode tb cfg rsp failed, code:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
268

D
dapan1121 已提交
269 270 271 272 273 274 275 276
      qDebug("Got table cfg from vnode, tbFName:%s", target);
      break;
    }
    case TDMT_MND_TABLE_CFG: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for stb cfg from mnode, error:%s, tbFName:%s", tstrerror(rspCode), target);
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
277

D
dapan1121 已提交
278 279 280 281 282
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process mnode stb cfg rsp failed, error:%s, tbFName:%s", tstrerror(code), target);
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
283

D
dapan1121 已提交
284 285
      qDebug("Got stb cfg from mnode, tbFName:%s", target);
      break;
dengyihao's avatar
dengyihao 已提交
286
    }
D
dapan1121 已提交
287 288 289 290 291
    case TDMT_MND_SERVER_VERSION: {
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("error rsp for svr ver from mnode, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
292

D
dapan1121 已提交
293 294 295 296 297
      code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
      if (code) {
        qError("Process svr ver rsp failed, error:%s", tstrerror(code));
        CTG_ERR_RET(code);
      }
dengyihao's avatar
dengyihao 已提交
298

D
dapan1121 已提交
299 300 301
      qDebug("Got svr ver from mnode");
      break;
    }
D
dapan1121 已提交
302
    default:
D
dapan1121 已提交
303 304 305 306
      if (TSDB_CODE_SUCCESS != rspCode) {
        qError("Got error rsp, error:%s", tstrerror(rspCode));
        CTG_ERR_RET(rspCode);
      }
dengyihao's avatar
dengyihao 已提交
307

D
dapan1121 已提交
308 309
      qError("invalid req type %s", TMSG_INFO(reqType));
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
310 311 312 313 314
  }

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
315
int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
D
dapan1121 已提交
316
  SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
dengyihao's avatar
dengyihao 已提交
317 318 319
  int32_t                code = 0;
  SCtgJob*               pJob = NULL;

D
dapan1121 已提交
320
  CTG_API_JENTER();
D
dapan1121 已提交
321

D
dapan1121 已提交
322
  pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
D
dapan1121 已提交
323
  if (NULL == pJob) {
D
dapan1121 已提交
324
    qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
D
dapan1121 已提交
325 326 327
    goto _return;
  }

328 329
  SCatalog* pCtg = pJob->pCtg;

D
dapan1121 已提交
330 331 332
  if (TDMT_VND_BATCH_META == cbParam->reqType || TDMT_MND_BATCH_META == cbParam->reqType) {
    CTG_ERR_JRET(ctgHandleBatchRsp(pJob, cbParam, pMsg, rspCode));
  } else {
dengyihao's avatar
dengyihao 已提交
333 334
    int32_t*  taskId = taosArrayGet(cbParam->taskId, 0);
    SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
D
dapan1121 已提交
335

dengyihao's avatar
dengyihao 已提交
336 337
    qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
           TMSG_INFO(cbParam->reqType + 1));
338 339

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
340 341
    SHashObj* pBatchs =
        taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
342 343
    if (NULL == pBatchs) {
      ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
344
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
345 346 347 348
    }
    pTask->pBatchs = pBatchs;
#endif

D
dapan1121 已提交
349 350 351 352
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;
    CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
353 354

#if CTG_BATCH_FETCH
dengyihao's avatar
dengyihao 已提交
355 356
    CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
#endif
D
dapan1121 已提交
357
  }
dengyihao's avatar
dengyihao 已提交
358

D
dapan1121 已提交
359 360
_return:

D
dapan1121 已提交
361 362
  taosMemoryFree(pMsg->pData);

D
dapan1121 已提交
363 364 365 366 367 368 369
  if (pJob) {
    taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
  }

  CTG_API_LEAVE(code);
}

D
dapan1121 已提交
370
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, SArray* pMsgIdx, int32_t msgType,
dengyihao's avatar
dengyihao 已提交
371
                           SMsgSendInfo** pMsgSendInfo) {
D
dapan1121 已提交
372
  int32_t       code = 0;
dengyihao's avatar
dengyihao 已提交
373
  SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
D
dapan1121 已提交
374 375
  if (NULL == msgSendInfo) {
    qError("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
376
    CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
D
dapan1121 已提交
377 378
  }

dengyihao's avatar
dengyihao 已提交
379
  SCtgTaskCallbackParam* param = taosMemoryCalloc(1, sizeof(SCtgTaskCallbackParam));
D
dapan1121 已提交
380 381 382 383 384 385
  if (NULL == param) {
    qError("calloc %d failed", (int32_t)sizeof(SCtgTaskCallbackParam));
    CTG_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
  }

  param->reqType = msgType;
D
dapan1121 已提交
386 387 388 389
  param->queryId = pJob->queryId;
  param->refId = pJob->refId;
  param->taskId = pTaskId;
  param->batchId = batchId;
D
dapan1121 已提交
390
  param->msgIdx = pMsgIdx;
D
dapan1121 已提交
391 392

  msgSendInfo->param = param;
393
  msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
D
dapan1121 已提交
394 395 396 397 398 399 400 401
  msgSendInfo->fp = ctgHandleMsgCallback;

  *pMsgSendInfo = msgSendInfo;

  return TSDB_CODE_SUCCESS;

_return:

402 403
  taosArrayDestroy(pTaskId);
  destroySendMsgInfo(msgSendInfo);
D
dapan1121 已提交
404 405 406 407

  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
408
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
D
dapan1121 已提交
409
                        SArray* pMsgIdx, char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
dengyihao's avatar
dengyihao 已提交
410 411
  int32_t       code = 0;
  SMsgSendInfo* pMsgSendInfo = NULL;
D
dapan1121 已提交
412
  CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, pMsgIdx, msgType, &pMsgSendInfo));
D
dapan1121 已提交
413

D
dapan1121 已提交
414
  ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
D
dapan1121 已提交
415 416 417

  pMsgSendInfo->requestId = pConn->requestId;
  pMsgSendInfo->requestObjRefId = pConn->requestObjRefId;
D
dapan1121 已提交
418 419 420 421 422 423
  pMsgSendInfo->msgInfo.pData = msg;
  pMsgSendInfo->msgInfo.len = msgSize;
  pMsgSendInfo->msgInfo.handle = NULL;
  pMsgSendInfo->msgType = msgType;

  int64_t transporterId = 0;
D
dapan1121 已提交
424
  code = asyncSendMsgToServer(pConn->pTrans, &pConn->mgmtEps, &transporterId, pMsgSendInfo);
425
  pMsgSendInfo = NULL;
D
dapan1121 已提交
426 427 428 429 430
  if (code) {
    ctgError("asyncSendMsgToSever failed, error: %s", tstrerror(code));
    CTG_ERR_JRET(code);
  }

D
dapan1121 已提交
431
  ctgDebug("ctg req msg sent, reqId:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
D
dapan1121 已提交
432 433 434 435 436
  return TSDB_CODE_SUCCESS;

_return:

  if (pMsgSendInfo) {
D
dapan1121 已提交
437 438 439 440 441 442
    destroySendMsgInfo(pMsgSendInfo);
  }

  CTG_RET(code);
}

D
dapan1121 已提交
443
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType, void* msg,
dengyihao's avatar
dengyihao 已提交
444 445
                    uint32_t msgSize) {
  int32_t    code = 0;
D
dapan1121 已提交
446
  SCtgTask*  pTask = tReq->pTask;
dengyihao's avatar
dengyihao 已提交
447 448
  SHashObj*  pBatchs = pTask->pBatchs;
  SCtgJob*   pJob = pTask->pJob;
D
dapan1121 已提交
449 450 451 452 453
  SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
  SCtgBatch newBatch = {0};
  SBatchMsg req = {0};
  
  if (NULL == pBatch) {
D
dapan1121 已提交
454 455
    newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
    newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
D
dapan1121 已提交
456 457
    newBatch.pMsgIdxs = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
    if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds || NULL == newBatch.pMsgIdxs) {
D
dapan1121 已提交
458 459
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
dengyihao's avatar
dengyihao 已提交
460

461
    newBatch.conn = *pConn;
D
dapan1121 已提交
462

D
dapan1121 已提交
463
    req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
464 465 466 467 468 469 470 471 472
    req.msgType = msgType;
    req.msgLen = msgSize;
    req.msg = msg;
    if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
    if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
473 474 475
    if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }
476
    newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
D
dapan1121 已提交
477 478

    if (vgId > 0) {
D
dapan1121 已提交
479
      SName* pName = NULL;
D
dapan1121 已提交
480 481
      if (TDMT_VND_TABLE_CFG == msgType) {
        SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
482
        pName = ctx->pName;
D
dapan1121 已提交
483
      } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
484
        if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
485
          SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
D
dapan1121 已提交
486
          SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
487
          pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
488 489 490 491
        } else {
          SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
          pName = ctx->pName;
        }
D
dapan1121 已提交
492 493 494 495
      } else {
        ctgError("invalid vnode msgType %d", msgType);
        CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
      }
D
dapan1121 已提交
496 497

      tNameGetFullDbName(pName, newBatch.dbFName);
D
dapan1121 已提交
498 499 500 501 502 503 504 505 506
    }

    newBatch.msgType = (vgId > 0) ? TDMT_VND_BATCH_META : TDMT_MND_BATCH_META;
    newBatch.batchId = atomic_add_fetch_32(&pJob->batchId, 1);

    if (0 != taosHashPut(pBatchs, &vgId, sizeof(vgId), &newBatch, sizeof(newBatch))) {
      CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
    }

dengyihao's avatar
dengyihao 已提交
507 508
    ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), newBatch.batchId,
             vgId);
D
dapan1121 已提交
509 510 511 512

    return TSDB_CODE_SUCCESS;
  }

D
dapan1121 已提交
513
  req.msgIdx = tReq->msgIdx;
D
dapan1121 已提交
514 515 516 517 518 519 520 521 522
  req.msgType = msgType;
  req.msgLen = msgSize;
  req.msg = msg;
  if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
  if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
D
dapan1121 已提交
523 524 525 526
  if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
    CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
  }
  
D
dapan1121 已提交
527 528 529
  pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;

  if (vgId > 0) {
D
dapan1121 已提交
530
    SName* pName = NULL;
D
dapan1121 已提交
531 532
    if (TDMT_VND_TABLE_CFG == msgType) {
      SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
D
dapan1121 已提交
533
      pName = ctx->pName;
D
dapan1121 已提交
534
    } else if (TDMT_VND_TABLE_META == msgType) {
D
dapan1121 已提交
535
      if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
536
        SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
D
dapan1121 已提交
537
        SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
538
        pName = ctgGetFetchName(ctx->pNames, fetch);
D
dapan1121 已提交
539 540 541 542
      } else {
        SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
        pName = ctx->pName;
      }
D
dapan1121 已提交
543 544 545 546
    } else {
      ctgError("invalid vnode msgType %d", msgType);
      CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
    }
D
dapan1121 已提交
547 548

    tNameGetFullDbName(pName, newBatch.dbFName);    
D
dapan1121 已提交
549 550
  }

dengyihao's avatar
dengyihao 已提交
551 552
  ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
           vgId);
D
dapan1121 已提交
553 554 555 556 557 558 559

  return TSDB_CODE_SUCCESS;

_return:

  ctgFreeBatch(&newBatch);
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
560

D
dapan1121 已提交
561 562 563
  return code;
}

564
int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
D
dapan1121 已提交
565
  *msg = taosMemoryCalloc(1, pBatch->msgSize);
D
dapan1121 已提交
566 567 568 569
  if (NULL == (*msg)) {
    CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
  }

dengyihao's avatar
dengyihao 已提交
570 571 572
  int32_t    offset = 0;
  int32_t    num = taosArrayGetSize(pBatch->pMsgs);
  SBatchReq* pBatchReq = (SBatchReq*)(*msg);
573 574 575 576

  pBatchReq->header.vgId = htonl(vgId);
  pBatchReq->msgNum = htonl(num);
  offset += sizeof(SBatchReq);
dengyihao's avatar
dengyihao 已提交
577

D
dapan1121 已提交
578 579
  for (int32_t i = 0; i < num; ++i) {
    SBatchMsg* pReq = taosArrayGet(pBatch->pMsgs, i);
D
dapan1121 已提交
580 581
    *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgIdx);
    offset += sizeof(pReq->msgIdx);    
D
dapan1121 已提交
582 583 584 585 586 587
    *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgType);
    offset += sizeof(pReq->msgType);
    *(int32_t*)((char*)(*msg) + offset) = htonl(pReq->msgLen);
    offset += sizeof(pReq->msgLen);
    memcpy((char*)(*msg) + offset, pReq->msg, pReq->msgLen);
    offset += pReq->msgLen;
D
dapan1121 已提交
588 589
  }

D
dapan1121 已提交
590 591
  ASSERT(pBatch->msgSize == offset);

592 593
  qDebug("batch req %d to vg %d msg built with %d meta reqs", pBatch->batchId, vgId, num);

D
dapan1121 已提交
594 595 596
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
597
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
D
dapan1121 已提交
598
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
599 600
  void*   msg = NULL;
  void*   p = taosHashIterate(pBatchs, NULL);
D
dapan1121 已提交
601
  while (NULL != p) {
dengyihao's avatar
dengyihao 已提交
602 603
    size_t     len = 0;
    int32_t*   vgId = taosHashGetKey(p, &len);
D
dapan1121 已提交
604
    SCtgBatch* pBatch = (SCtgBatch*)p;
605 606

    ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
dengyihao's avatar
dengyihao 已提交
607

608
    CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
D
dapan1121 已提交
609 610
    code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, 
                           pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
611 612
    pBatch->pTaskIds = NULL;
    CTG_ERR_JRET(code);
dengyihao's avatar
dengyihao 已提交
613

D
dapan1121 已提交
614 615 616 617 618 619 620 621 622 623 624
    p = taosHashIterate(pBatchs, p);
  }

  return TSDB_CODE_SUCCESS;

_return:

  if (p) {
    taosHashCancelIterate(pBatchs, p);
  }
  taosMemoryFree(msg);
dengyihao's avatar
dengyihao 已提交
625

D
dapan1121 已提交
626 627 628
  CTG_RET(code);
}

dengyihao's avatar
dengyihao 已提交
629 630
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray* out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
631 632
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_QNODE_LIST;
dengyihao's avatar
dengyihao 已提交
633
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
634

D
dapan1121 已提交
635
  ctgDebug("try to get qnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);
D
dapan1121 已提交
636

D
dapan1121 已提交
637
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
638 639 640 641 642
  if (code) {
    ctgError("Build qnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
643 644 645 646 647
  if (pTask) {
    void* pOut = taosArrayInit(4, sizeof(SQueryNodeLoad));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
648

D
dapan1121 已提交
649
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, NULL));
D
dapan1121 已提交
650

651
#if CTG_BATCH_FETCH
D
dapan1121 已提交
652 653 654 655
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
656
#else
D
dapan1121 已提交
657 658 659 660 661
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
662

D
dapan1121 已提交
663
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
664
#endif
D
dapan1121 已提交
665
  }
dengyihao's avatar
dengyihao 已提交
666

D
dapan1121 已提交
667 668
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
669
      .pCont = msg,
D
dapan1121 已提交
670 671 672 673 674 675 676 677
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
678 679
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
680 681 682
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
683 684
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
685 686
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_DNODE_LIST;
dengyihao's avatar
dengyihao 已提交
687
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
688 689 690 691 692 693 694 695 696

  ctgDebug("try to get dnode list from mnode, mgmtEpInUse:%d", pConn->mgmtEps.inUse);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build dnode list msg failed, error:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

D
dapan1121 已提交
697
  if (pTask) {
D
dapan1121 已提交
698
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
D
dapan1121 已提交
699

700
#if CTG_BATCH_FETCH
D
dapan1121 已提交
701 702 703 704
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
705
#else
D
dapan1121 已提交
706 707 708 709 710
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
711

D
dapan1121 已提交
712
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
713
#endif
D
dapan1121 已提交
714
  }
dengyihao's avatar
dengyihao 已提交
715

D
dapan1121 已提交
716 717
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
718
      .pCont = msg,
D
dapan1121 已提交
719 720 721 722
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
723
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
724 725 726

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));

D
dapan1121 已提交
727 728
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
729 730 731
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
732
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
D
dapan1121 已提交
733
                                SCtgTaskReq* tReq) {
dengyihao's avatar
dengyihao 已提交
734
  char*   msg = NULL;
D
dapan1121 已提交
735 736
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_USE_DB;
D
dapan1121 已提交
737
  SCtgTask* pTask = tReq ? tReq->pTask : NULL;
dengyihao's avatar
dengyihao 已提交
738
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
739 740 741

  ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);

D
dapan1121 已提交
742
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](input, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
743 744 745 746 747 748 749 750 751 752
  if (code) {
    ctgError("Build use db msg failed, code:%x, db:%s", code, input->db);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SUseDbOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
753

D
dapan1121 已提交
754
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, input->db));
D
dapan1121 已提交
755

756
#if CTG_BATCH_FETCH
D
dapan1121 已提交
757
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
758
#else
D
dapan1121 已提交
759 760 761 762 763
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
764

D
dapan1121 已提交
765
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
766
#endif
D
dapan1121 已提交
767
  }
dengyihao's avatar
dengyihao 已提交
768

D
dapan1121 已提交
769 770
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
771
      .pCont = msg,
D
dapan1121 已提交
772 773 774 775
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
776
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
777 778

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, input->db));
dengyihao's avatar
dengyihao 已提交
779

D
dapan1121 已提交
780
  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
781

D
dapan1121 已提交
782 783 784
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
785 786 787
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* dbFName, SDbCfgInfo* out,
                             SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
788 789
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_DB_CFG;
dengyihao's avatar
dengyihao 已提交
790
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
791 792 793

  ctgDebug("try to get db cfg from mnode, dbFName:%s", dbFName);

dengyihao's avatar
dengyihao 已提交
794
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)dbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
795 796 797 798 799 800 801 802 803 804
  if (code) {
    ctgError("Build get db cfg msg failed, code:%x, db:%s", code, dbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SDbCfgInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
805

D
dapan1121 已提交
806
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)dbFName));
D
dapan1121 已提交
807

808
#if CTG_BATCH_FETCH
D
dapan1121 已提交
809 810 811 812
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
813
#else
D
dapan1121 已提交
814 815 816 817 818
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
819

D
dapan1121 已提交
820
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
821
#endif
D
dapan1121 已提交
822
  }
dengyihao's avatar
dengyihao 已提交
823

D
dapan1121 已提交
824 825
  SRpcMsg rpcMsg = {
      .msgType = TDMT_MND_GET_DB_CFG,
dengyihao's avatar
dengyihao 已提交
826
      .pCont = msg,
D
dapan1121 已提交
827 828 829 830
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
831
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
832 833 834

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)dbFName));

D
dapan1121 已提交
835 836
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
837 838 839
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
840 841 842
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* indexName, SIndexInfo* out,
                                 SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
843 844
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_INDEX;
dengyihao's avatar
dengyihao 已提交
845
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
846 847 848

  ctgDebug("try to get index from mnode, indexName:%s", indexName);

dengyihao's avatar
dengyihao 已提交
849
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)indexName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
850 851 852 853 854 855 856 857 858 859
  if (code) {
    ctgError("Build get index msg failed, code:%x, db:%s", code, indexName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SIndexInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
860

D
dapan1121 已提交
861
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)indexName));
862 863

#if CTG_BATCH_FETCH
D
dapan1121 已提交
864 865 866 867
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
868
#else
D
dapan1121 已提交
869 870 871 872 873
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
874

D
dapan1121 已提交
875
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
876
#endif
D
dapan1121 已提交
877
  }
dengyihao's avatar
dengyihao 已提交
878

D
dapan1121 已提交
879 880
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
881
      .pCont = msg,
D
dapan1121 已提交
882 883 884 885
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
886
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
887 888

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)indexName));
D
dapan1121 已提交
889 890

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
891

D
dapan1121 已提交
892 893 894
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
895 896 897
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* name, STableIndex* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
898 899
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
dengyihao's avatar
dengyihao 已提交
900
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
901 902
  char tbFName[TSDB_TABLE_FNAME_LEN];
  tNameExtractFullName(name, tbFName);
D
dapan1121 已提交
903 904 905

  ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);

dengyihao's avatar
dengyihao 已提交
906
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)tbFName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
907 908 909 910 911 912
  if (code) {
    ctgError("Build get index msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
913 914 915 916 917
    void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    
D
dapan1121 已提交
918
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
919 920

#if CTG_BATCH_FETCH
D
dapan1121 已提交
921 922 923 924
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
925
#else
D
dapan1121 已提交
926 927 928 929 930
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
931

D
dapan1121 已提交
932
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
933
#endif
D
dapan1121 已提交
934
  }
dengyihao's avatar
dengyihao 已提交
935

D
dapan1121 已提交
936 937
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
938
      .pCont = msg,
D
dapan1121 已提交
939 940 941 942
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
943
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
944 945

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
946 947

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
948

D
dapan1121 已提交
949 950 951
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
952 953 954
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* out,
                               SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
955 956
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_RETRIEVE_FUNC;
dengyihao's avatar
dengyihao 已提交
957
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
958 959 960

  ctgDebug("try to get udf info from mnode, funcName:%s", funcName);

dengyihao's avatar
dengyihao 已提交
961
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)funcName, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
962 963 964 965 966 967 968 969 970 971
  if (code) {
    ctgError("Build get udf msg failed, code:%x, db:%s", code, funcName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SFuncInfo));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
972

D
dapan1121 已提交
973
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)funcName));
974 975

#if CTG_BATCH_FETCH
D
dapan1121 已提交
976 977 978 979
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
980
#else
D
dapan1121 已提交
981 982 983 984 985
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
986

D
dapan1121 已提交
987
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
988
#endif
D
dapan1121 已提交
989
  }
dengyihao's avatar
dengyihao 已提交
990

D
dapan1121 已提交
991 992
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
993
      .pCont = msg,
D
dapan1121 已提交
994 995 996 997
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
998
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
999 1000 1001

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)funcName));

D
dapan1121 已提交
1002 1003
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1004 1005 1006
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1007 1008 1009
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, SGetUserAuthRsp* out,
                                  SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1010 1011
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_GET_USER_AUTH;
dengyihao's avatar
dengyihao 已提交
1012
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1013 1014 1015

  ctgDebug("try to get user auth from mnode, user:%s", user);

dengyihao's avatar
dengyihao 已提交
1016
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)]((void*)user, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1017 1018 1019 1020 1021 1022 1023 1024 1025 1026
  if (code) {
    ctgError("Build get user auth msg failed, code:%x, db:%s", code, user);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(SGetUserAuthRsp));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1027

D
dapan1121 已提交
1028
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)user));
1029 1030

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1031 1032 1033 1034
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1035
#else
D
dapan1121 已提交
1036 1037 1038 1039 1040
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1041

D
dapan1121 已提交
1042
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1043
#endif
D
dapan1121 已提交
1044
  }
dengyihao's avatar
dengyihao 已提交
1045

D
dapan1121 已提交
1046 1047
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1048
      .pCont = msg,
D
dapan1121 已提交
1049 1050 1051 1052
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1053
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1054 1055

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)user));
D
dapan1121 已提交
1056 1057

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1058

D
dapan1121 已提交
1059 1060 1061
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1062
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
D
dapan1121 已提交
1063 1064
                                  STableMetaOutput* out, SCtgTaskReq* tReq) {
  SCtgTask *pTask = tReq ? tReq->pTask : NULL;                                  
D
dapan1121 已提交
1065
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
dengyihao's avatar
dengyihao 已提交
1066 1067 1068 1069 1070
  char*            msg = NULL;
  SEpSet*          pVnodeEpSet = NULL;
  int32_t          msgLen = 0;
  int32_t          reqType = TDMT_MND_TABLE_META;
  char             tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1071
  sprintf(tbFName, "%s.%s", dbFName, tbName);
dengyihao's avatar
dengyihao 已提交
1072
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1073 1074 1075

  ctgDebug("try to get table meta from mnode, tbFName:%s", tbFName);

D
dapan1121 已提交
1076
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
  if (code) {
    ctgError("Build mnode stablemeta msg failed, code:%x", code);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1087

D
dapan1121 已提交
1088
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
D
dapan1121 已提交
1089
    
1090
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1091
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1092
#else
D
dapan1121 已提交
1093 1094 1095 1096 1097
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1098

D
dapan1121 已提交
1099
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1100
#endif
D
dapan1121 已提交
1101 1102 1103 1104
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1105
      .pCont = msg,
D
dapan1121 已提交
1106 1107 1108 1109
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1110
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
dengyihao's avatar
dengyihao 已提交
1111

D
dapan1121 已提交
1112 1113
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));

D
dapan1121 已提交
1114 1115
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1116 1117 1118
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1119
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
D
dapan1121 已提交
1120
                              SCtgTaskReq* tReq) {
D
dapan1121 已提交
1121 1122 1123
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);

D
dapan1121 已提交
1124
  return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, tReq);
D
dapan1121 已提交
1125 1126
}

dengyihao's avatar
dengyihao 已提交
1127
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
D
dapan1121 已提交
1128 1129
                              STableMetaOutput* out, SCtgTaskReq* tReq) {
  SCtgTask *pTask = tReq ? tReq->pTask : NULL;
D
dapan1121 已提交
1130 1131 1132
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
  int32_t reqType = TDMT_VND_TABLE_META;
dengyihao's avatar
dengyihao 已提交
1133
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1134
  sprintf(tbFName, "%s.%s", dbFName, pTableName->tname);
dengyihao's avatar
dengyihao 已提交
1135
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1136

D
dapan1121 已提交
1137
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1138 1139
  ctgDebug("try to get table meta from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1140

dengyihao's avatar
dengyihao 已提交
1141 1142 1143
  SBuildTableInput bInput = {
      .vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)tNameGetTableName(pTableName)};
  char*   msg = NULL;
D
dapan1121 已提交
1144 1145
  int32_t msgLen = 0;

D
dapan1121 已提交
1146
  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
D
dapan1121 已提交
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156
  if (code) {
    ctgError("Build vnode tablemeta msg failed, code:%x, tbFName:%s", code, tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
    void* pOut = taosMemoryCalloc(1, sizeof(STableMetaOutput));
    if (NULL == pOut) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
D
dapan1121 已提交
1157

dengyihao's avatar
dengyihao 已提交
1158 1159 1160 1161
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1162

D
dapan1121 已提交
1163
  CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
D
dapan1121 已提交
1164

D
dapan1121 已提交
1165
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1166
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1167
#else
D
dapan1121 已提交
1168
    SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1169
    char           dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1170 1171 1172 1173 1174 1175 1176
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);

D
dapan1121 已提交
1177
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->vgId, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1178
#endif
D
dapan1121 已提交
1179 1180 1181 1182
  }

  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1183
      .pCont = msg,
D
dapan1121 已提交
1184 1185 1186 1187
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1188
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1189

dengyihao's avatar
dengyihao 已提交
1190
  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, tbFName));
D
dapan1121 已提交
1191

D
dapan1121 已提交
1192 1193
  rpcFreeCont(rpcRsp.pCont);

D
dapan1121 已提交
1194 1195 1196
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1197 1198 1199
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
                                SVgroupInfo* vgroupInfo, STableCfg** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1200 1201
  int32_t msgLen = 0;
  int32_t reqType = TDMT_VND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1202
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1203
  tNameExtractFullName(pTableName, tbFName);
dengyihao's avatar
dengyihao 已提交
1204
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1205 1206
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1207
  SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1208

D
dapan1121 已提交
1209
  SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
dengyihao's avatar
dengyihao 已提交
1210 1211
  ctgDebug("try to get table cfg from vnode, vgId:%d, ep num:%d, ep %s:%d, tbFName:%s", vgroupInfo->vgId,
           vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
D
dapan1121 已提交
1212 1213 1214 1215 1216 1217 1218 1219

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1220
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1221

dengyihao's avatar
dengyihao 已提交
1222 1223 1224 1225
    SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
                              .requestId = pConn->requestId,
                              .requestObjRefId = pConn->requestObjRefId,
                              .mgmtEps = vgroupInfo->epSet};
D
dapan1121 已提交
1226
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1227 1228 1229 1230
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
D
dapan1121 已提交
1231 1232
#else
    SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
dengyihao's avatar
dengyihao 已提交
1233
    char          dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
1234 1235 1236 1237 1238 1239
    tNameGetFullDbName(ctx->pName, dbFName);
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1240

D
dapan1121 已提交
1241
    CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1242
#endif
D
dapan1121 已提交
1243
  }
dengyihao's avatar
dengyihao 已提交
1244

D
dapan1121 已提交
1245 1246
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1247
      .pCont = msg,
D
dapan1121 已提交
1248 1249 1250 1251
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
D
dapan1121 已提交
1252
  rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
D
dapan1121 已提交
1253 1254

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1255 1256

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1257

D
dapan1121 已提交
1258 1259 1260
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1261 1262 1263
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** out,
                                SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1264 1265
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_TABLE_CFG;
dengyihao's avatar
dengyihao 已提交
1266
  char    tbFName[TSDB_TABLE_FNAME_LEN];
D
dapan1121 已提交
1267
  tNameExtractFullName(pTableName, tbFName);
dengyihao's avatar
dengyihao 已提交
1268
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1269 1270
  char dbFName[TSDB_DB_FNAME_LEN];
  tNameGetFullDbName(pTableName, dbFName);
D
dapan1121 已提交
1271
  SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = (char*)pTableName->tname};
D
dapan1121 已提交
1272 1273 1274 1275 1276 1277 1278 1279 1280 1281

  ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1282
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
D
dapan1121 已提交
1283

1284
#if CTG_BATCH_FETCH
D
dapan1121 已提交
1285 1286 1287 1288
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1289
#else
D
dapan1121 已提交
1290 1291 1292 1293 1294
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1295

D
dapan1121 已提交
1296
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1297
#endif
D
dapan1121 已提交
1298
  }
dengyihao's avatar
dengyihao 已提交
1299

D
dapan1121 已提交
1300 1301
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1302
      .pCont = msg,
D
dapan1121 已提交
1303 1304 1305 1306 1307 1308 1309
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
D
dapan1121 已提交
1310 1311

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1312

D
dapan1121 已提交
1313 1314 1315
  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
1316 1317
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** out, SCtgTask* pTask) {
  char*   msg = NULL;
D
dapan1121 已提交
1318 1319
  int32_t msgLen = 0;
  int32_t reqType = TDMT_MND_SERVER_VERSION;
dengyihao's avatar
dengyihao 已提交
1320
  void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
D
dapan1121 已提交
1321 1322 1323 1324 1325 1326 1327 1328 1329 1330

  qDebug("try to get svr ver from mnode");

  int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](NULL, &msg, 0, &msgLen, mallocFp);
  if (code) {
    ctgError("Build get svr ver msg failed, code:%s", tstrerror(code));
    CTG_ERR_RET(code);
  }

  if (pTask) {
D
dapan1121 已提交
1331
    CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
1332 1333

#if CTG_BATCH_FETCH
D
dapan1121 已提交
1334 1335 1336 1337
    SCtgTaskReq tReq;
    tReq.pTask = pTask;
    tReq.msgIdx = -1;    
    CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
dengyihao's avatar
dengyihao 已提交
1338
#else
D
dapan1121 已提交
1339 1340 1341 1342 1343
    SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
    if (NULL == pTaskId) {
      CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
    }
    taosArrayPush(pTaskId, &pTask->taskId);
dengyihao's avatar
dengyihao 已提交
1344

D
dapan1121 已提交
1345
    CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
1346
#endif
D
dapan1121 已提交
1347
  }
dengyihao's avatar
dengyihao 已提交
1348

D
dapan1121 已提交
1349 1350
  SRpcMsg rpcMsg = {
      .msgType = reqType,
dengyihao's avatar
dengyihao 已提交
1351
      .pCont = msg,
D
dapan1121 已提交
1352 1353 1354 1355 1356 1357 1358
      .contLen = msgLen,
  };

  SRpcMsg rpcRsp = {0};
  rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);

  CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, NULL));
D
dapan1121 已提交
1359 1360

  rpcFreeCont(rpcRsp.pCont);
dengyihao's avatar
dengyihao 已提交
1361

D
dapan1121 已提交
1362 1363
  return TSDB_CODE_SUCCESS;
}