clientMsgHandler.c 17.4 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Haojun Liao 已提交
17
#include "clientInt.h"
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "os.h"
D
dapan1121 已提交
20
#include "query.h"
21 22
#include "systable.h"
#include "tdatablock.h"
L
Liu Jicong 已提交
23
#include "tdef.h"
24
#include "tglobal.h"
L
Liu Jicong 已提交
25
#include "tname.h"
26
#include "tversion.h"
H
Haojun Liao 已提交
27

H
Haojun Liao 已提交
28 29 30 31 32
static void setErrno(SRequestObj* pRequest, int32_t code) {
  pRequest->code = code;
  terrno = code;
}

D
dapan1121 已提交
33
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
34
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
35 36
  setErrno(pRequest, code);

37 38 39 40
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
    removeMeta(pRequest->pTscObj, pRequest->targetTableList);
  }

dengyihao's avatar
dengyihao 已提交
41
  taosMemoryFree(pMsg->pEpSet);
wafwerar's avatar
wafwerar 已提交
42
  taosMemoryFree(pMsg->pData);
43 44 45 46 47
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
48
  return code;
49 50
}

D
dapan1121 已提交
51
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
52
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
D
dapan1121 已提交
53 54 55
  if (NULL == pRequest) {
    goto End;
  }
56

H
Haojun Liao 已提交
57
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
58
    setErrno(pRequest, code);
59
    tsem_post(&pRequest->body.rspSem);
dengyihao's avatar
dengyihao 已提交
60
    goto End;
H
Haojun Liao 已提交
61
  }
62

S
Shengliang Guan 已提交
63
  STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
64

D
dapan1121 已提交
65 66 67 68 69
  if (NULL == pTscObj->pAppInfo || NULL == pTscObj->pAppInfo->pAppHbMgr) {
    setErrno(pRequest, TSDB_CODE_TSC_DISCONNECTED);
    tsem_post(&pRequest->body.rspSem);
    goto End;
  }
70

S
Shengliang Guan 已提交
71
  SConnectRsp connectRsp = {0};
dengyihao's avatar
dengyihao 已提交
72 73 74 75
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
    code = TSDB_CODE_TSC_INVALID_VERSION;
    setErrno(pRequest, code);
    tsem_post(&pRequest->body.rspSem);
dengyihao's avatar
dengyihao 已提交
76
    goto End;
dengyihao's avatar
dengyihao 已提交
77
  }
dengyihao's avatar
dengyihao 已提交
78

X
Xiaoyu Wang 已提交
79
  if ((code = taosCheckVersionCompatibleFromStr(version, connectRsp.sVer, 3)) != 0) {
80 81 82 83 84
    setErrno(pRequest, code);
    tsem_post(&pRequest->body.rspSem);
    goto End;
  }

dengyihao's avatar
dengyihao 已提交
85 86 87 88
  int32_t now = taosGetTimestampSec();
  int32_t delta = abs(now - connectRsp.svrTimestamp);
  if (delta > timestampDeltaLimit) {
    code = TSDB_CODE_TIME_UNSYNCED;
dengyihao's avatar
dengyihao 已提交
89
    tscError("time diff:%ds is too big", delta);
dengyihao's avatar
dengyihao 已提交
90 91
    setErrno(pRequest, code);
    tsem_post(&pRequest->body.rspSem);
dengyihao's avatar
dengyihao 已提交
92
    goto End;
dengyihao's avatar
dengyihao 已提交
93 94
  }

L
Liu Jicong 已提交
95 96
  /*assert(connectRsp.epSet.numOfEps > 0);*/
  if (connectRsp.epSet.numOfEps == 0) {
S
Shengliang Guan 已提交
97
    setErrno(pRequest, TSDB_CODE_APP_ERROR);
L
Liu Jicong 已提交
98
    tsem_post(&pRequest->body.rspSem);
dengyihao's avatar
dengyihao 已提交
99
    goto End;
L
Liu Jicong 已提交
100
  }
H
Haojun Liao 已提交
101

D
dapan1121 已提交
102
  if (connectRsp.dnodeNum == 1) {
dengyihao's avatar
dengyihao 已提交
103 104 105 106
    SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
    SEpSet dstEpSet = connectRsp.epSet;
    rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
                      dstEpSet.eps[dstEpSet.inUse].fqdn);
D
dapan1121 已提交
107
  } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
D
dapan1121 已提交
108
    SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
109 110 111 112 113
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
             pNewEp->port);
S
Shengliang Guan 已提交
114
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
115 116
  }

S
Shengliang Guan 已提交
117
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
S
Shengliang Guan 已提交
118
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
D
dapan1121 已提交
119
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
H
Haojun Liao 已提交
120 121
  }

122
  pTscObj->sysInfo = connectRsp.sysInfo;
S
Shengliang Guan 已提交
123 124
  pTscObj->connId = connectRsp.connId;
  pTscObj->acctId = connectRsp.acctId;
D
dapan1121 已提交
125 126
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
H
Haojun Liao 已提交
127 128

  // update the appInstInfo
S
Shengliang Guan 已提交
129
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
D
dapan1121 已提交
130
  lastClusterId = connectRsp.clusterId;
H
Haojun Liao 已提交
131

L
Liu Jicong 已提交
132
  pTscObj->connType = connectRsp.connType;
133
  pTscObj->passInfo.ver = connectRsp.passVer;
134

D
dapan1121 已提交
135
  hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
L
Liu Jicong 已提交
136

S
Shengliang Guan 已提交
137
  tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
138
           pTscObj->pAppInfo->numOfConns);
139

140
  tsem_post(&pRequest->body.rspSem);
dengyihao's avatar
dengyihao 已提交
141 142
End:

D
dapan1121 已提交
143 144 145
  if (pRequest) {
    releaseRequest(pRequest->self);
  }
146

D
dapan1121 已提交
147
  taosMemoryFree(param);
dengyihao's avatar
dengyihao 已提交
148 149 150
  taosMemoryFree(pMsg->pEpSet);
  taosMemoryFree(pMsg->pData);
  return code;
151
}
H
Haojun Liao 已提交
152

L
Liu Jicong 已提交
153
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
154
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
155

H
Haojun Liao 已提交
156
  pMsgSendInfo->requestObjRefId = pRequest->self;
L
Liu Jicong 已提交
157 158 159
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->param = pRequest;
  pMsgSendInfo->msgType = pRequest->type;
D
dapan1121 已提交
160
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
161

H
Haojun Liao 已提交
162
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
H
Haojun Liao 已提交
163
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
164
  return pMsgSendInfo;
165 166
}

D
dapan1121 已提交
167
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
168
  // todo rsp with the vnode id list
169
  SRequestObj* pRequest = param;
wafwerar's avatar
wafwerar 已提交
170
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
171
  taosMemoryFree(pMsg->pEpSet);
X
Xiaoyu Wang 已提交
172 173
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
174 175 176 177
  } else {
    struct SCatalog* pCatalog = NULL;
    int32_t          code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (TSDB_CODE_SUCCESS == code) {
178
      STscObj* pTscObj = pRequest->pTscObj;
179 180 181 182 183

      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
                               .requestId = pRequest->requestId,
                               .requestObjRefId = pRequest->self,
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
184
      char             dbFName[TSDB_DB_FNAME_LEN];
185 186 187 188
      snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
      catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
      snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
      catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
189
    }
X
Xiaoyu Wang 已提交
190
  }
191 192 193 194 195 196

  if (pRequest->body.queryFp) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
X
Xiaoyu Wang 已提交
197
  return code;
H
Haojun Liao 已提交
198
}
H
Haojun Liao 已提交
199

D
dapan1121 已提交
200
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
201 202
  SRequestObj* pRequest = param;

203 204
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
D
dapan1121 已提交
205 206
    SUseDbRsp usedbRsp = {0};
    tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
L
Liu Jicong 已提交
207
    struct SCatalog* pCatalog = NULL;
D
dapan1121 已提交
208

209
    if (usedbRsp.vgVersion >= 0) {  // cached in local
210
      uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
dengyihao's avatar
dengyihao 已提交
211
      int32_t  code1 = catalogGetHandle(clusterId, &pCatalog);
212
      if (code1 != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
213 214
        tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
                tstrerror(code1));
D
dapan1121 已提交
215 216 217 218 219
      } else {
        catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
      }
    }

L
Liu Jicong 已提交
220
    tFreeSUsedbRsp(&usedbRsp);
D
dapan1121 已提交
221 222
  }

H
Haojun Liao 已提交
223
  if (code != TSDB_CODE_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
224
    taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
225
    taosMemoryFree(pMsg->pEpSet);
H
Haojun Liao 已提交
226
    setErrno(pRequest, code);
227 228 229 230 231 232 233

    if (pRequest->body.queryFp != NULL) {
      pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
    } else {
      tsem_post(&pRequest->body.rspSem);
    }

H
Haojun Liao 已提交
234 235 236
    return code;
  }

S
Shengliang Guan 已提交
237 238 239
  SUseDbRsp usedbRsp = {0};
  tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);

dengyihao's avatar
dengyihao 已提交
240
  if (strlen(usedbRsp.db) == 0) {
241 242 243 244 245
    if (usedbRsp.errCode != 0) {
      return usedbRsp.errCode;
    } else {
      return TSDB_CODE_APP_ERROR;
    }
wmmhello's avatar
wmmhello 已提交
246 247
  }

S
Shengliang Guan 已提交
248 249 250 251 252 253 254 255 256
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
    }
  }

257
  SName name = {0};
L
Liu Jicong 已提交
258
  tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
S
Shengliang Guan 已提交
259

D
dapan1121 已提交
260 261 262 263 264 265 266
  SUseDbOutput output = {0};
  code = queryBuildUseDbOutput(&output, &usedbRsp);

  if (code != 0) {
    terrno = code;
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);

dengyihao's avatar
dengyihao 已提交
267
    tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
D
dapan1121 已提交
268
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
L
Liu Jicong 已提交
269 270
    struct SCatalog* pCatalog = NULL;

271 272
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (code1 != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
273
      tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
274
              tstrerror(code1));
D
dapan1121 已提交
275 276
    } else {
      catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
D
dapan1121 已提交
277
      output.dbVgroup = NULL;
D
dapan1121 已提交
278 279 280
    }
  }

D
dapan1121 已提交
281 282
  taosMemoryFreeClear(output.dbVgroup);

S
Shengliang Guan 已提交
283
  tFreeSUsedbRsp(&usedbRsp);
284 285 286

  char db[TSDB_DB_NAME_LEN] = {0};
  tNameGetDbName(&name, db);
287

288
  setConnectionDB(pRequest->pTscObj, db);
wafwerar's avatar
wafwerar 已提交
289
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
290
  taosMemoryFree(pMsg->pEpSet);
291 292 293 294 295 296

  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
297
  return 0;
298 299
}

D
dapan1121 已提交
300
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
301
  if (pMsg == NULL || param == NULL) {
302 303
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
304
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
305 306 307

  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
308 309
  } else {
    SMCreateStbRsp createRsp = {0};
dengyihao's avatar
dengyihao 已提交
310
    SDecoder       coder = {0};
311 312 313 314 315 316
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    tDecodeSMCreateStbRsp(&coder, &createRsp);
    tDecoderClear(&coder);

    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
H
Haojun Liao 已提交
317 318
  }

dengyihao's avatar
dengyihao 已提交
319
  taosMemoryFree(pMsg->pEpSet);
320 321
  taosMemoryFree(pMsg->pData);

322
  if (pRequest->body.queryFp != NULL) {
323 324 325 326 327 328 329 330 331 332 333 334 335
    SExecResult* pRes = &pRequest->body.resInfo.execRes;

    if (code == TSDB_CODE_SUCCESS) {
      SCatalog* pCatalog = NULL;
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
      if (pRes->res != NULL) {
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
      }

      if (ret != TSDB_CODE_SUCCESS) {
        code = ret;
      }
    }
dengyihao's avatar
dengyihao 已提交
336

337 338 339 340
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
341
  return code;
342 343
}

D
dapan1121 已提交
344
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
345
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
346 347
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
348 349 350
  } else {
    SDropDbRsp dropdbRsp = {0};
    tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
D
dapan1121 已提交
351

352
    struct SCatalog* pCatalog = NULL;
dengyihao's avatar
dengyihao 已提交
353
    int32_t          code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
354 355
    if (TSDB_CODE_SUCCESS == code) {
      catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
356
      STscObj* pTscObj = pRequest->pTscObj;
357 358 359 360 361

      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
                               .requestId = pRequest->requestId,
                               .requestObjRefId = pRequest->self,
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
362
      char             dbFName[TSDB_DB_FNAME_LEN];
D
dapan1121 已提交
363 364 365 366
      snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
      catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
      snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
      catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
D
dapan1121 已提交
367
    }
368
  }
D
dapan1121 已提交
369

D
dapan1121 已提交
370
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
371
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
372

373 374 375 376 377
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
378
  return code;
379 380
}

D
dapan1121 已提交
381
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
382 383 384
  SRequestObj* pRequest = param;
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
385 386 387 388 389 390 391 392 393
  } else {
    SMAlterStbRsp alterRsp = {0};
    SDecoder      coder = {0};
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    tDecodeSMAlterStbRsp(&coder, &alterRsp);
    tDecoderClear(&coder);

    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
D
dapan1121 已提交
394 395
  }

D
dapan1121 已提交
396
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
397
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
398

399
  if (pRequest->body.queryFp != NULL) {
D
dapan1121 已提交
400
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
401 402 403 404 405 406 407 408 409 410 411 412 413

    if (code == TSDB_CODE_SUCCESS) {
      SCatalog* pCatalog = NULL;
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
      if (pRes->res != NULL) {
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
      }

      if (ret != TSDB_CODE_SUCCESS) {
        code = ret;
      }
    }

414 415 416 417
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
D
dapan1121 已提交
418 419 420
  return code;
}

D
dapan1121 已提交
421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
  pBlock->info.hasVarCol = true;

  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));

  SColumnInfoData infoData = {0};
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;

  taosArrayPush(pBlock->pDataBlock, &infoData);

  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
  taosArrayPush(pBlock->pDataBlock, &infoData);

  int32_t numOfCfg = taosArrayGetSize(pVars);
  blockDataEnsureCapacity(pBlock, numOfCfg);

  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
441
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
D
dapan1121 已提交
442 443 444

    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
445
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
446
    colDataSetVal(pColInfo, i, name, false);
447

D
dapan1121 已提交
448 449 450
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
451
    colDataSetVal(pColInfo, i, value, false);
D
dapan1121 已提交
452 453 454 455 456
  }

  pBlock->info.rows = numOfCfg;

  *block = pBlock;
457

D
dapan1121 已提交
458 459 460 461 462
  return TSDB_CODE_SUCCESS;
}

static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
  SSDataBlock* pBlock = NULL;
463
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
D
dapan1121 已提交
464 465 466 467 468 469 470
  if (code) {
    return code;
  }

  size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
  *pRsp = taosMemoryCalloc(1, rspSize);
  if (NULL == *pRsp) {
D
dapan1121 已提交
471
    blockDataDestroy(pBlock);
D
dapan1121 已提交
472 473 474 475 476 477 478 479
    return TSDB_CODE_OUT_OF_MEMORY;
  }

  (*pRsp)->useconds = 0;
  (*pRsp)->completed = 1;
  (*pRsp)->precision = 0;
  (*pRsp)->compressed = 0;
  (*pRsp)->compLen = 0;
480
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
D
dapan1121 已提交
481 482
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);

H
Haojun Liao 已提交
483
  int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
484 485
  blockDataDestroy(pBlock);

486 487 488
  if (len != rspSize - sizeof(SRetrieveTableRsp)) {
    uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
489 490
    return TSDB_CODE_TSC_INVALID_INPUT;
  }
D
dapan1121 已提交
491 492 493 494

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
495
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
D
dapan1121 已提交
496 497 498 499
  SRequestObj* pRequest = param;
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
  } else {
500
    SShowVariablesRsp  rsp = {0};
D
dapan1121 已提交
501 502 503 504 505 506
    SRetrieveTableRsp* pRes = NULL;
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
    if (TSDB_CODE_SUCCESS == code) {
      code = buildShowVariablesRsp(rsp.variables, &pRes);
    }
    if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
507
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
D
dapan1121 已提交
508
    }
509

X
Xiaoyu Wang 已提交
510
    if (code != 0) {
wmmhello's avatar
wmmhello 已提交
511 512
      taosMemoryFree(pRes);
    }
D
dapan1121 已提交
513 514 515
    tFreeSShowVariablesRsp(&rsp);
  }

D
dapan1121 已提交
516
  taosMemoryFree(pMsg->pData);
dengyihao's avatar
dengyihao 已提交
517
  taosMemoryFree(pMsg->pEpSet);
D
dapan1121 已提交
518

D
dapan1121 已提交
519 520 521 522 523 524 525 526
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
  return code;
}

H
Haojun Liao 已提交
527 528 529 530 531 532 533 534 535 536 537 538 539 540
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
  switch (msgType) {
    case TDMT_MND_CONNECT:
      return processConnectRsp;
    case TDMT_MND_CREATE_DB:
      return processCreateDbRsp;
    case TDMT_MND_USE_DB:
      return processUseDbRsp;
    case TDMT_MND_CREATE_STB:
      return processCreateSTableRsp;
    case TDMT_MND_DROP_DB:
      return processDropDbRsp;
    case TDMT_MND_ALTER_STB:
      return processAlterStbRsp;
D
dapan1121 已提交
541 542
    case TDMT_MND_SHOW_VARIABLES:
      return processShowVariablesRsp;
H
Haojun Liao 已提交
543 544 545
    default:
      return genericRspCallback;
  }
L
Liu Jicong 已提交
546
}