clientMsgHandler.c 8.8 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

L
Liu Jicong 已提交
16
#include "catalog.h"
H
Haojun Liao 已提交
17
#include "clientInt.h"
18
#include "clientLog.h"
L
Liu Jicong 已提交
19
#include "os.h"
D
dapan1121 已提交
20
#include "query.h"
L
Liu Jicong 已提交
21 22
#include "tdef.h"
#include "tname.h"
H
Haojun Liao 已提交
23

S
Shengliang Guan 已提交
24
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
25

H
Haojun Liao 已提交
26 27 28 29 30
static void setErrno(SRequestObj* pRequest, int32_t code) {
  pRequest->code = code;
  terrno = code;
}

S
Shengliang Guan 已提交
31
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
32
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
33 34
  setErrno(pRequest, code);

wafwerar's avatar
wafwerar 已提交
35
  taosMemoryFree(pMsg->pData);
36 37 38 39 40
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
41
  return code;
42 43
}

S
Shengliang Guan 已提交
44
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
45
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
46
  if (code != TSDB_CODE_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
47
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
48
    setErrno(pRequest, code);
49
    tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
50 51
    return code;
  }
52

S
Shengliang Guan 已提交
53
  STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
54

S
Shengliang Guan 已提交
55 56
  SConnectRsp connectRsp = {0};
  tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
L
Liu Jicong 已提交
57 58 59 60 61 62 63
  /*assert(connectRsp.epSet.numOfEps > 0);*/
  if (connectRsp.epSet.numOfEps == 0) {
    taosMemoryFree(pMsg->pData);
    setErrno(pRequest, TSDB_CODE_MND_APP_ERROR);
    tsem_post(&pRequest->body.rspSem);
    return code;
  }
H
Haojun Liao 已提交
64

D
dapan1121 已提交
65
  if (connectRsp.dnodeNum == 1) {
dengyihao's avatar
dengyihao 已提交
66 67 68 69
    SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
    SEpSet dstEpSet = connectRsp.epSet;
    rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
                      dstEpSet.eps[dstEpSet.inUse].fqdn);
D
dapan1121 已提交
70
  } else if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
S
Shengliang Guan 已提交
71
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
72 73
  }

S
Shengliang Guan 已提交
74
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
S
Shengliang Guan 已提交
75
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
S
Shengliang Guan 已提交
76
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
H
Haojun Liao 已提交
77 78
  }

S
Shengliang Guan 已提交
79 80 81
  pTscObj->connId = connectRsp.connId;
  pTscObj->acctId = connectRsp.acctId;
  tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
H
Haojun Liao 已提交
82 83

  // update the appInstInfo
S
Shengliang Guan 已提交
84
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
H
Haojun Liao 已提交
85 86
  atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);

L
Liu Jicong 已提交
87
  pTscObj->connType = connectRsp.connType;
88

89
  hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
L
Liu Jicong 已提交
90

91
  //  pRequest->body.resInfo.pRspMsg = pMsg->pData;
S
Shengliang Guan 已提交
92
  tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
93
           pTscObj->pAppInfo->numOfConns);
94

wafwerar's avatar
wafwerar 已提交
95
  taosMemoryFree(pMsg->pData);
96
  tsem_post(&pRequest->body.rspSem);
97 98
  return 0;
}
H
Haojun Liao 已提交
99

L
Liu Jicong 已提交
100
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
101
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
102

H
Haojun Liao 已提交
103
  pMsgSendInfo->requestObjRefId = pRequest->self;
L
Liu Jicong 已提交
104 105 106
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->param = pRequest;
  pMsgSendInfo->msgType = pRequest->type;
107

H
Haojun Liao 已提交
108 109
  assert(pRequest != NULL);
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
110

L
Liu Jicong 已提交
111 112 113
  pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)
                         ? genericRspCallback
                         : handleRequestRspFp[TMSG_INDEX(pRequest->type)];
114
  return pMsgSendInfo;
115 116
}

117
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
118
  // todo rsp with the vnode id list
119
  SRequestObj* pRequest = param;
wafwerar's avatar
wafwerar 已提交
120
  taosMemoryFree(pMsg->pData);
X
Xiaoyu Wang 已提交
121 122 123
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
  }
124 125 126 127 128 129

  if (pRequest->body.queryFp) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
X
Xiaoyu Wang 已提交
130
  return code;
H
Haojun Liao 已提交
131
}
H
Haojun Liao 已提交
132

133
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
134 135
  SRequestObj* pRequest = param;

D
dapan1121 已提交
136 137 138
  if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
    SUseDbRsp usedbRsp = {0};
    tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
L
Liu Jicong 已提交
139
    struct SCatalog* pCatalog = NULL;
D
dapan1121 已提交
140 141

    if (usedbRsp.vgVersion >= 0) {
142
      uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
dengyihao's avatar
dengyihao 已提交
143
      int32_t  code1 = catalogGetHandle(clusterId, &pCatalog);
144
      if (code1 != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
145 146
        tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
                tstrerror(code1));
D
dapan1121 已提交
147 148 149 150 151
      } else {
        catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
      }
    }

L
Liu Jicong 已提交
152
    tFreeSUsedbRsp(&usedbRsp);
D
dapan1121 已提交
153 154
  }

H
Haojun Liao 已提交
155
  if (code != TSDB_CODE_SUCCESS) {
wafwerar's avatar
wafwerar 已提交
156
    taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
157
    setErrno(pRequest, code);
158 159 160 161 162 163 164

    if (pRequest->body.queryFp != NULL) {
      pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
    } else {
      tsem_post(&pRequest->body.rspSem);
    }

H
Haojun Liao 已提交
165 166 167
    return code;
  }

S
Shengliang Guan 已提交
168 169 170
  SUseDbRsp usedbRsp = {0};
  tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);

171
  SName name = {0};
L
Liu Jicong 已提交
172
  tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB);
S
Shengliang Guan 已提交
173

D
dapan1121 已提交
174 175 176 177 178 179
  SUseDbOutput output = {0};
  code = queryBuildUseDbOutput(&output, &usedbRsp);

  if (code != 0) {
    terrno = code;
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
wafwerar's avatar
wafwerar 已提交
180
    taosMemoryFreeClear(output.dbVgroup);
D
dapan1121 已提交
181

dengyihao's avatar
dengyihao 已提交
182
    tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
D
dapan1121 已提交
183
  } else if (output.dbVgroup) {
L
Liu Jicong 已提交
184 185
    struct SCatalog* pCatalog = NULL;

186 187
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (code1 != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
188
      tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
189
              tstrerror(code1));
D
dapan1121 已提交
190
      taosMemoryFreeClear(output.dbVgroup);
D
dapan1121 已提交
191 192 193 194 195
    } else {
      catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
    }
  }

S
Shengliang Guan 已提交
196
  tFreeSUsedbRsp(&usedbRsp);
197 198 199

  char db[TSDB_DB_NAME_LEN] = {0};
  tNameGetDbName(&name, db);
200

201
  setConnectionDB(pRequest->pTscObj, db);
wafwerar's avatar
wafwerar 已提交
202
  taosMemoryFree(pMsg->pData);
203 204 205 206 207 208

  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, pRequest->code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
209
  return 0;
210 211
}

212
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
213
  assert(pMsg != NULL && param != NULL);
214
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
215

wafwerar's avatar
wafwerar 已提交
216
  taosMemoryFree(pMsg->pData);
H
Haojun Liao 已提交
217 218 219 220
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
  }

221 222 223 224 225
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
226
  return code;
227 228
}

229 230
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
231 232
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
233 234 235
  } else {
    SDropDbRsp dropdbRsp = {0};
    tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
D
dapan1121 已提交
236

237 238 239 240
    struct SCatalog* pCatalog = NULL;
    catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
  }
D
dapan1121 已提交
241

242 243 244 245 246
  if (pRequest->body.queryFp != NULL) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  } else {
    tsem_post(&pRequest->body.rspSem);
  }
H
Haojun Liao 已提交
247
  return code;
248 249
}

D
dapan1121 已提交
250 251 252 253 254 255 256 257 258 259 260 261 262
int32_t processAlterStbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
  if (code != TSDB_CODE_SUCCESS) {
    setErrno(pRequest, code);
    tsem_post(&pRequest->body.rspSem);
    return code;
  }

  SMAlterStbRsp alterRsp = {0};
  SDecoder coder = {0};
  tDecoderInit(&coder, pMsg->pData, pMsg->len);
  tDecodeSMAlterStbRsp(&coder, &alterRsp);
  tDecoderClear(&coder);
D
dapan1121 已提交
263 264 265

  pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
  pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
D
dapan1121 已提交
266 267 268 269 270 271

  tsem_post(&pRequest->body.rspSem);
  return code;
}


272
// todo refactor: this arraylist is too large
H
Haojun Liao 已提交
273
void initMsgHandleFp() {
S
Shengliang Guan 已提交
274 275 276
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)] = processConnectRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)] = processCreateDbRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)] = processUseDbRsp;
277
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)] = processCreateTableRsp;
S
Shengliang Guan 已提交
278
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)] = processDropDbRsp;
D
dapan1121 已提交
279
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_ALTER_STB)] = processAlterStbRsp;
L
Liu Jicong 已提交
280
}