clientMsgHandler.c 11.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "os.h"
H
Haojun Liao 已提交
17
#include "tdef.h"
H
Haojun Liao 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "clientInt.h"
20
#include "clientLog.h"
H
Haojun Liao 已提交
21 22
#include "trpc.h"

H
Haojun Liao 已提交
23
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
H
Haojun Liao 已提交
24

H
Haojun Liao 已提交
25
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
26 27 28
  SRequestObj* pRequest = param;
  pRequest->code = code;
  sem_post(&pRequest->body.rspSem);
29 30 31
  return 0;
}

32 33
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
H
Haojun Liao 已提交
34 35 36
  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
    terrno         = code;
H
Haojun Liao 已提交
37 38

    sem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
39 40
    return code;
  }
41

H
Haojun Liao 已提交
42 43
  STscObj *pTscObj = pRequest->pTscObj;

44
  SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
H
Haojun Liao 已提交
45 46
  pConnect->acctId    = htonl(pConnect->acctId);
  pConnect->connId    = htonl(pConnect->connId);
47
  pConnect->clusterId = htobe64(pConnect->clusterId);
H
Haojun Liao 已提交
48

49 50 51 52 53
  assert(pConnect->epSet.numOfEps > 0);
  for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
    pConnect->epSet.port[i] = htons(pConnect->epSet.port[i]);
  }

H
Haojun Liao 已提交
54 55 56 57 58
  if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
  }

  for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
59
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.fqdn[i], pConnect->epSet.port[i], pTscObj->id);
H
Haojun Liao 已提交
60 61 62
  }

  pTscObj->connId = pConnect->connId;
H
Haojun Liao 已提交
63
  pTscObj->acctId = pConnect->acctId;
H
Haojun Liao 已提交
64 65 66 67 68

  // update the appInstInfo
  pTscObj->pAppInfo->clusterId = pConnect->clusterId;
  atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);

69
  //  pRequest->body.resInfo.pRspMsg = pMsg->pData;
70 71
  tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
           pTscObj->pAppInfo->numOfConns);
72 73

  sem_post(&pRequest->body.rspSem);
74 75
  return 0;
}
H
Haojun Liao 已提交
76

77
static int32_t buildRetrieveMnodeMsg(SRequestObj *pRequest, SMsgSendInfo* pMsgSendInfo) {
H
Haojun Liao 已提交
78
  pMsgSendInfo->msgType         = TDMT_MND_SHOW_RETRIEVE;
79 80 81
  pMsgSendInfo->msgInfo.len     = sizeof(SRetrieveTableMsg);
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->param           = pRequest;
D
catalog  
dapan1121 已提交
82
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
83 84 85 86 87 88 89

  SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
  if (pRetrieveMsg == NULL) {
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

  pRetrieveMsg->showId  = htonl(pRequest->body.execId);
90
  pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
91 92 93
  return TSDB_CODE_SUCCESS;
}

94 95 96
SMsgSendInfo* buildSendMsgInfoImpl(SRequestObj *pRequest) {
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));

H
Hongze Cheng 已提交
97
  if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
98
    buildRetrieveMnodeMsg(pRequest, pMsgSendInfo);
99 100
  } else {
    assert(pRequest != NULL);
101
    pMsgSendInfo->requestObjRefId = pRequest->self;
102 103
    pMsgSendInfo->msgInfo   = pRequest->body.requestMsg;
    pMsgSendInfo->msgType   = pRequest->type;
104
    pMsgSendInfo->requestId = pRequest->requestId;
105
    pMsgSendInfo->param     = pRequest;
106

D
catalog  
dapan1121 已提交
107
    pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
H
Haojun Liao 已提交
108
  }
109 110

  return pMsgSendInfo;
111 112
}

113 114 115 116 117 118 119 120 121
int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SRequestObj* pRequest = param;
  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
    tsem_post(&pRequest->body.rspSem);
    return code;
  }

  SShowRsp* pShow = (SShowRsp *)pMsg->pData;
122 123 124 125 126 127 128 129
  pShow->showId   = htonl(pShow->showId);

  STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
  pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);

  SSchema* pSchema = pMetaMsg->pSchema;
  pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
Haojun Liao 已提交
130
    pSchema->bytes = htonl(pSchema->bytes);
131 132 133
    pSchema++;
  }

H
Haojun Liao 已提交
134 135 136 137 138 139 140
  pSchema = pMetaMsg->pSchema;
  TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
  for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
    tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
    pFields[i].type  = pSchema[i].type;
    pFields[i].bytes = pSchema[i].bytes;
  }
141

142
  pRequest->body.resInfo.pRspMsg = pMsg->pData;
143
  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
144

H
Haojun Liao 已提交
145 146 147 148 149 150 151
  pResInfo->fields    = pFields;
  pResInfo->numOfCols = pMetaMsg->numOfColumns;
  pResInfo->row       = calloc(pResInfo->numOfCols, POINTER_BYTES);
  pResInfo->pCol      = calloc(pResInfo->numOfCols, POINTER_BYTES);
  pResInfo->length    = calloc(pResInfo->numOfCols, sizeof(int32_t));

  pRequest->body.execId = pShow->showId;
152
  tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
153 154 155
  return 0;
}

156 157
int32_t processRetrieveMnodeRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  assert(pMsg->len >= sizeof(SRetrieveTableRsp));
H
Haojun Liao 已提交
158

159 160 161
  SRequestObj* pRequest = param;
//  tfree(pRequest->body.resInfo.pRspMsg);
//  pRequest->body.resInfo.pRspMsg = pMsg->pData;
H
Haojun Liao 已提交
162

163
  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg->pData;
H
Haojun Liao 已提交
164 165 166
  pRetrieve->numOfRows  = htonl(pRetrieve->numOfRows);
  pRetrieve->precision  = htons(pRetrieve->precision);

167
  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
168 169 170

  tfree(pResInfo->pRspMsg);
  pResInfo->pRspMsg   = pMsg->pData;
H
Haojun Liao 已提交
171
  pResInfo->numOfRows = pRetrieve->numOfRows;
172
  pResInfo->pData     = pRetrieve->data;              // todo fix this in async model
H
Haojun Liao 已提交
173 174 175 176 177 178

  pResInfo->current = 0;
  setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);

  tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
           pRetrieve->completed, pRequest->body.execId);
179 180

  tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
181 182 183
  return 0;
}

184
int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
H
Haojun Liao 已提交
185
  // todo rsp with the vnode id list
186 187
  SRequestObj* pRequest = param;
  tsem_post(&pRequest->body.rspSem);
H
Haojun Liao 已提交
188
}
H
Haojun Liao 已提交
189

190 191
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
  SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData;
192 193 194 195 196
  SName name = {0};
  tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);

  char db[TSDB_DB_NAME_LEN] = {0};
  tNameGetDbName(&name, db);
197 198

  SRequestObj* pRequest = param;
199
  setConnectionDB(pRequest->pTscObj, db);
200 201 202

  tsem_post(&pRequest->body.rspSem);
  return 0;
203 204
}

205
int32_t processCreateTableRsp(void* param, const SDataBuf* pMsg, int32_t code) {
206
  assert(pMsg != NULL);
207 208
  SRequestObj* pRequest = param;
  tsem_post(&pRequest->body.rspSem);
209 210
}

211
int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
212
  // todo: Remove cache in catalog cache.
213 214
  SRequestObj* pRequest = param;
  tsem_post(&pRequest->body.rspSem);
215 216
}

H
Haojun Liao 已提交
217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
void initMsgHandleFp() {
#if 0
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
  tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;

  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
  tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
H
Haojun Liao 已提交
242
  tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg;
H
Haojun Liao 已提交
243 244 245 246 247 248 249 250 251 252
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
  tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;


  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
H
Haojun Liao 已提交
253
  tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
H
Haojun Liao 已提交
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;

  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
Haojun Liao 已提交
271
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
Haojun Liao 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;

  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;

  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif

D
catalog  
dapan1121 已提交
293 294 295 296 297 298 299
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CONNECT)]       = processConnectRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW)]          = processShowRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = processRetrieveMnodeRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_DB)]     = processCreateDbRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_USE_DB)]        = processUseDbRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_CREATE_STB)]    = processCreateTableRsp;
  handleRequestRspFp[TMSG_INDEX(TDMT_MND_DROP_DB)]       = processDropDbRsp;
H
Haojun Liao 已提交
300
}