clientMsgHandler.c 12.7 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "os.h"
H
Haojun Liao 已提交
17
#include "tdef.h"
H
Haojun Liao 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "clientInt.h"
20
#include "clientLog.h"
H
Haojun Liao 已提交
21 22
#include "trpc.h"

H
Haojun Liao 已提交
23
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
24

H
Haojun Liao 已提交
25
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
26 27 28
  SRequestObj* pRequest = param;
  pRequest->code = code;
  sem_post(&pRequest->body.rspSem);
29 30 31
  return 0;
}

32 33
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
34 35 36
  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
    terrno         = code;
H
Haojun Liao 已提交
37 38

    sem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
39 40
    return code;
  }
41

H
Haojun Liao 已提交
42 43
  STscObj *pTscObj = pRequest->pTscObj;

44
  SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
H
Haojun Liao 已提交
45 46
  pConnect->acctId    = htonl(pConnect->acctId);
  pConnect->connId    = htonl(pConnect->connId);
47
  pConnect->clusterId = htobe64(pConnect->clusterId);
H
Haojun Liao 已提交
48

49 50 51 52 53
  assert(pConnect->epSet.numOfEps > 0);
  for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
    pConnect->epSet.port[i] = htons(pConnect->epSet.port[i]);
  }

H
Haojun Liao 已提交
54 55 56 57 58
  if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
  }

  for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
59
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.fqdn[i], pConnect->epSet.port[i], pTscObj->id);
H
Haojun Liao 已提交
60 61 62
  }

  pTscObj->connId = pConnect->connId;
H
Haojun Liao 已提交
63
  pTscObj->acctId = pConnect->acctId;
H
Haojun Liao 已提交
64 65 66 67 68

  // update the appInstInfo
  pTscObj->pAppInfo->clusterId = pConnect->clusterId;
  atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);

69
  //  pRequest->body.resInfo.pRspMsg = pMsg->pData;
70 71
  tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
           pTscObj->pAppInfo->numOfConns);
72 73

  sem_post(&pRequest->body.rspSem);
74 75
  return 0;
}
H
Haojun Liao 已提交
76

77 78 79
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));

H
Haojun Liao 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
  pMsgSendInfo->param           = pRequest;
  pMsgSendInfo->msgType         = pRequest->type;

  if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
    if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
      SRetrieveTableMsg* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
      if (pRetrieveMsg == NULL) {
        return NULL;
      }

      pRetrieveMsg->showId = htonl(pRequest->body.execId);
      pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
      pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableMsg);
    } else {
      SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
      if (pFetchMsg == NULL) {
        return NULL;
      }

      pFetchMsg->id = htonl(pRequest->body.execId);
      pFetchMsg->head.vgId = htonl(13);
      pMsgSendInfo->msgInfo.pData = pFetchMsg;
      pMsgSendInfo->msgInfo.len = sizeof(SVShowTablesFetchReq);
    }
106 107
  } else {
    assert(pRequest != NULL);
108
    pMsgSendInfo->msgInfo   = pRequest->body.requestMsg;
H
Haojun Liao 已提交
109
  }
110

H
Haojun Liao 已提交
111
  pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
112
  return pMsgSendInfo;
113 114
}

115 116 117 118 119 120 121 122 123
int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
    tsem_post(&pRequest->body.rspSem);
    return code;
  }

  SShowRsp* pShow = (SShowRsp *)pMsg->pData;
124 125 126 127 128 129 130 131
  pShow->showId   = htonl(pShow->showId);

  STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
  pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);

  SSchema* pSchema = pMetaMsg->pSchema;
  pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
Haojun Liao 已提交
132
    pSchema->bytes = htonl(pSchema->bytes);
H
Haojun Liao 已提交
133
    pSchema->colId = htonl(pSchema->colId);
134 135 136
    pSchema++;
  }

H
Haojun Liao 已提交
137 138 139 140 141 142 143
  pSchema = pMetaMsg->pSchema;
  TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
  for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
    tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
    pFields[i].type  = pSchema[i].type;
    pFields[i].bytes = pSchema[i].bytes;
  }
144

145
  pRequest->body.resInfo.pRspMsg = pMsg->pData;
146
  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
147

H
Haojun Liao 已提交
148 149 150 151 152 153 154
  pResInfo->fields    = pFields;
  pResInfo->numOfCols = pMetaMsg->numOfColumns;
  pResInfo->row       = calloc(pResInfo->numOfCols, POINTER_BYTES);
  pResInfo->pCol      = calloc(pResInfo->numOfCols, POINTER_BYTES);
  pResInfo->length    = calloc(pResInfo->numOfCols, sizeof(int32_t));

  pRequest->body.execId = pShow->showId;
155
  tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
156 157 158
  return 0;
}

159 160
int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  assert(pMsg->len >= sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
161

162
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
163 164 165 166 167 168 169 170 171 172
  tfree(pRequest->body.resInfo.pRspMsg);

  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
    terrno = code;
    tsem_post(&pRequest->body.rspSem);
    return code;
  }

  pRequest->body.resInfo.pRspMsg = pMsg->pData;
H
Haojun Liao 已提交
173

174
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
H
Haojun Liao 已提交
175 176 177
  pRetrieve->numOfRows  = htonl(pRetrieve->numOfRows);
  pRetrieve->precision  = htons(pRetrieve->precision);

178
  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
179 180

  pResInfo->pRspMsg   = pMsg->pData;
H
Haojun Liao 已提交
181
  pResInfo->numOfRows = pRetrieve->numOfRows;
H
Haojun Liao 已提交
182
  pResInfo->pData     = pRetrieve->data;
H
Haojun Liao 已提交
183 184 185 186 187 188

  pResInfo->current = 0;
  setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);

  tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
           pRetrieve->completed, pRequest->body.execId);
189 190

  tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
191 192 193
  return 0;
}

H
Haojun Liao 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229
int32_t processRetrieveVndRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  assert(pMsg->len >= sizeof(SRetrieveTableRsp));

  SRequestObj* pRequest = param;
  tfree(pRequest->body.resInfo.pRspMsg);

  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
    terrno = code;
    tsem_post(&pRequest->body.rspSem);
    return code;
  }

  pRequest->body.resInfo.pRspMsg = pMsg->pData;

  SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *) pMsg->pData;
  pFetchRsp->numOfRows  = htonl(pFetchRsp->numOfRows);
  pFetchRsp->precision  = htons(pFetchRsp->precision);

  SReqResultInfo* pResInfo = &pRequest->body.resInfo;

  tfree(pResInfo->pRspMsg);
  pResInfo->pRspMsg   = pMsg->pData;
  pResInfo->numOfRows = pFetchRsp->numOfRows;
  pResInfo->pData     = pFetchRsp->data;

  pResInfo->current = 0;
  setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);

  tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pFetchRsp->numOfRows,
           pFetchRsp->completed, pRequest->body.execId);

  tsem_post(&pRequest->body.rspSem);
  return 0;
}

230
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
231
  // todo rsp with the vnode id list
232 233
  SRequestObj* pRequest = param;
  tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
234
}
H
Haojun Liao 已提交
235

236 237
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData;
238 239 240 241 242
  SName name = {0};
  tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);

  char db[TSDB_DB_NAME_LEN] = {0};
  tNameGetDbName(&name, db);
243 244

  SRequestObj* pRequest = param;
245
  setConnectionDB(pRequest->pTscObj, db);
246 247 248

  tsem_post(&pRequest->body.rspSem);
  return 0;
249 250
}

251
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
252
  assert(pMsg != NULL);
253 254
  SRequestObj* pRequest = param;
  tsem_post(&pRequest->body.rspSem);
255 256
}

257
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
258
  // todo: Remove cache in catalog cache.
259 260
  SRequestObj* pRequest = param;
  tsem_post(&pRequest->body.rspSem);
261 262
}

H
Haojun Liao 已提交
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287
void initMsgHandleFp() {
#if 0
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
  tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;

  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
  tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
H
Haojun Liao 已提交
288
  tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg;
H
Haojun Liao 已提交
289 290 291 292 293 294 295 296 297 298
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
  tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;


  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
H
Haojun Liao 已提交
299
  tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
H
Haojun Liao 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;

  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
Haojun Liao 已提交
317
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
Haojun Liao 已提交
318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;

  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;

  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif

D
catalog  
dapan1121 已提交
339 340 341 342 343 344 345
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)]       = processConnectRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)]          = processShowRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = processRetrieveMnodeRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)]     = processCreateDbRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)]        = processUseDbRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)]    = processCreateTableRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)]       = processDropDbRsp;
H
Haojun Liao 已提交
346 347 348

  handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)]   = processShowRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)]   = processRetrieveVndRsp;
H
Haojun Liao 已提交
349
}