clientMsgHandler.c 9.1 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Haojun Liao 已提交
17
#include "clientInt.h"
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "os.h"
D
dapan1121 已提交
20
#include "query.h"
L
Liu Jicong 已提交
21 22
#include "tdef.h"
#include "tname.h"
H
Haojun Liao 已提交
23

H
Haojun Liao 已提交
24 25 26 27 28
static void setErrno(SRequestObj* pRequest, int32_t code) {
  pRequest->code = code;
  terrno = code;
}

S
Shengliang Guan 已提交
29
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
30
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
31 32
  setErrno(pRequest, code);

wafwerar's avatar
wafwerar 已提交
33
  taosMemoryFree(pMsg->pData);
34 35 36 37 38
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
39
  return code;
40 41
}

S
Shengliang Guan 已提交
42
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
43
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
44
  if (code != TSDB_CODE_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
45
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
46
    setErrno(pRequest, code);
47
    tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
48 49
    return code;
  }
50

S
Shengliang Guan 已提交
51
  STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
52

S
Shengliang Guan 已提交
53 54
  SConnectRsp connectRsp = {0};
  tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
L
Liu Jicong 已提交
55 56 57 58 59 60 61
  /*assert(connectRsp.epSet.numOfEps > 0);*/
  if (connectRsp.epSet.numOfEps == 0) {
    taosMemoryFree(pMsg->pData);
    setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
    tsem_post(&pRequest->body.rspSem);
    return code;
  }
H
Haojun Liao 已提交
62

D
dapan1121 已提交
63
  if (connectRsp.dnodeNum == 1) {
dengyihao's avatar
dengyihao 已提交
64 65 66 67
    SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
    SEpSet dstEpSet = connectRsp.epSet;
    rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
                      dstEpSet.eps[dstEpSet.inUse].fqdn);
D
dapan1121 已提交
68
  } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
S
Shengliang Guan 已提交
69
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
70 71
  }

S
Shengliang Guan 已提交
72
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
S
Shengliang Guan 已提交
73
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
S
Shengliang Guan 已提交
74
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
H
Haojun Liao 已提交
75 76
  }

S
Shengliang Guan 已提交
77 78 79
  pTscObj->connId = connectRsp.connId;
  pTscObj->acctId = connectRsp.acctId;
  tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
H
Haojun Liao 已提交
80 81

  // update the appInstInfo
S
Shengliang Guan 已提交
82
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
H
Haojun Liao 已提交
83 84
  atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);

L
Liu Jicong 已提交
85
  pTscObj->connType = connectRsp.connType;
86

87
  hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
L
Liu Jicong 已提交
88

89
  //  pRequest->body.resInfo.pRspMsg = pMsg->pData;
S
Shengliang Guan 已提交
90
  tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
91
           pTscObj->pAppInfo->numOfConns);
92

wafwerar's avatar
wafwerar 已提交
93
  taosMemoryFree(pMsg->pData);
94
  tsem_post(&pRequest->body.rspSem);
95 96
  return 0;
}
H
Haojun Liao 已提交
97

L
Liu Jicong 已提交
98
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
99
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
100

H
Haojun Liao 已提交
101
  pMsgSendInfo->requestObjRefId = pRequest->self;
L
Liu Jicong 已提交
102 103 104
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->param = pRequest;
  pMsgSendInfo->msgType = pRequest->type;
105

H
Haojun Liao 已提交
106 107
  assert(pRequest != NULL);
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
H
Haojun Liao 已提交
108
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
109
  return pMsgSendInfo;
110 111
}

112
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
113
  // todo rsp with the vnode id list
114
  SRequestObj* pRequest = param;
wafwerar's avatar
wafwerar 已提交
115
  taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
116 117 118
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
  }
119 120 121 122 123 124

  if (pRequest->body.queryFp) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
X
Xiaoyu Wang 已提交
125
  return code;
H
Haojun Liao 已提交
126
}
H
Haojun Liao 已提交
127

128
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
129 130
  SRequestObj* pRequest = param;

D
dapan1121 已提交
131 132 133
  if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
    SUseDbRsp usedbRsp = {0};
    tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
L
Liu Jicong 已提交
134
    struct SCatalog* pCatalog = NULL;
D
dapan1121 已提交
135 136

    if (usedbRsp.vgVersion >= 0) {
137
      uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
dengyihao's avatar
dengyihao 已提交
138
      int32_t  code1 = catalogGetHandle(clusterId, &pCatalog);
139
      if (code1 != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
140 141
        tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
                tstrerror(code1));
D
dapan1121 已提交
142 143 144 145 146
      } else {
        catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
      }
    }

L
Liu Jicong 已提交
147
    tFreeSUsedbRsp(&usedbRsp);
D
dapan1121 已提交
148 149
  }

H
Haojun Liao 已提交
150
  if (code != TSDB_CODE_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
151
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
152
    setErrno(pRequest, code);
153 154 155 156 157 158 159

    if (pRequest->body.queryFp != NULL) {
      pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
    } else {
      tsem_post(&pRequest->body.rspSem);
    }

H
Haojun Liao 已提交
160 161 162
    return code;
  }

S
Shengliang Guan 已提交
163 164 165
  SUseDbRsp usedbRsp = {0};
  tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);

166
  SName name = {0};
L
Liu Jicong 已提交
167
  tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
S
Shengliang Guan 已提交
168

D
dapan1121 已提交
169 170 171 172 173 174
  SUseDbOutput output = {0};
  code = queryBuildUseDbOutput(&output, &usedbRsp);

  if (code != 0) {
    terrno = code;
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
wafwerar's avatar
wafwerar 已提交
175
    taosMemoryFreeClear(output.dbVgroup);
D
dapan1121 已提交
176

dengyihao's avatar
dengyihao 已提交
177
    tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
D
dapan1121 已提交
178
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
L
Liu Jicong 已提交
179 180
    struct SCatalog* pCatalog = NULL;

181 182
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (code1 != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
183
      tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
184
              tstrerror(code1));
D
dapan1121 已提交
185
      taosMemoryFreeClear(output.dbVgroup);
D
dapan1121 已提交
186 187 188 189 190
    } else {
      catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
    }
  }

S
Shengliang Guan 已提交
191
  tFreeSUsedbRsp(&usedbRsp);
192 193 194

  char db[TSDB_DB_NAME_LEN] = {0};
  tNameGetDbName(&name, db);
195

196
  setConnectionDB(pRequest->pTscObj, db);
wafwerar's avatar
wafwerar 已提交
197
  taosMemoryFree(pMsg->pData);
198 199 200 201 202 203

  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
204
  return 0;
205 206
}

H
Haojun Liao 已提交
207
int32_t processCreateSTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
208
  assert(pMsg != NULL && param != NULL);
209
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
210

wafwerar's avatar
wafwerar 已提交
211
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
212 213 214 215
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
  }

216
  if (pRequest->body.queryFp != NULL) {
217
    removeMeta(pRequest->pTscObj, pRequest->tableList);
218 219 220 221
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
222
  return code;
223 224
}

225 226
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
227 228
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
229 230 231
  } else {
    SDropDbRsp dropdbRsp = {0};
    tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
D
dapan1121 已提交
232

233 234 235 236
    struct SCatalog* pCatalog = NULL;
    catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
  }
D
dapan1121 已提交
237

238 239 240 241 242
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
243
  return code;
244 245
}

D
dapan1121 已提交
246 247 248 249
int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
250 251 252 253 254 255 256 257 258
  } else {
    SMAlterStbRsp alterRsp = {0};
    SDecoder      coder = {0};
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
    tDecodeSMAlterStbRsp(&coder, &alterRsp);
    tDecoderClear(&coder);

    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
D
dapan1121 已提交
259 260
  }

261
  if (pRequest->body.queryFp != NULL) {
262 263 264 265 266 267 268 269 270 271 272 273 274 275
    SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;

    if (code == TSDB_CODE_SUCCESS) {
      SCatalog* pCatalog = NULL;
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
      if (pRes->res != NULL) {
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
      }

      if (ret != TSDB_CODE_SUCCESS) {
        code = ret;
      }
    }

276 277 278 279
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
D
dapan1121 已提交
280 281 282
  return code;
}

H
Haojun Liao 已提交
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
  switch (msgType) {
    case TDMT_MND_CONNECT:
      return processConnectRsp;
    case TDMT_MND_CREATE_DB:
      return processCreateDbRsp;
    case TDMT_MND_USE_DB:
      return processUseDbRsp;
    case TDMT_MND_CREATE_STB:
      return processCreateSTableRsp;
    case TDMT_MND_DROP_DB:
      return processDropDbRsp;
    case TDMT_MND_ALTER_STB:
      return processAlterStbRsp;
    default:
      return genericRspCallback;
  }
L
Liu Jicong 已提交
300
}