You need to sign in or sign up before continuing.
clientMsgHandler.c 12.3 KB
Newer Older
H
Haojun Liao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

H
Haojun Liao 已提交
16
#include "os.h"
H
Haojun Liao 已提交
17
#include "tdef.h"
H
Haojun Liao 已提交
18
#include "tname.h"
H
Haojun Liao 已提交
19
#include "clientInt.h"
20
#include "clientLog.h"
H
Haojun Liao 已提交
21 22 23 24 25 26
#include "tmsgtype.h"
#include "trpc.h"

int (*buildRequestMsgFp[TSDB_SQL_MAX])(SRequestObj *pRequest, SRequestMsgBody *pMsgBody) = {0};
int (*handleRequestRspFp[TSDB_SQL_MAX])(SRequestObj *pRequest, const char* pMsg, int32_t msgLen);

27 28
int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
  pMsgBody->msgType         = TSDB_MSG_TYPE_CONNECT;
29
  pMsgBody->msgInfo.len     = sizeof(SConnectMsg);
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
  pMsgBody->requestObjRefId = pRequest->self;

  SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
  if (pConnect == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return -1;
  }

  // TODO refactor full_name
  char *db;  // ugly code to move the space

  STscObj *pObj = pRequest->pTscObj;
  pthread_mutex_lock(&pObj->mutex);
  db = strstr(pObj->db, TS_PATH_DELIMITER);

  db = (db == NULL) ? pObj->db : db + 1;
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  pthread_mutex_unlock(&pObj->mutex);

  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));

53
  pMsgBody->msgInfo.pMsg = pConnect;
54 55 56
  return 0;
}

H
Haojun Liao 已提交
57 58 59 60 61 62 63 64
int processConnectRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
  STscObj *pTscObj = pRequest->pTscObj;

  SConnectRsp *pConnect = (SConnectRsp *)pMsg;
  pConnect->acctId    = htonl(pConnect->acctId);
  pConnect->connId    = htonl(pConnect->connId);
  pConnect->clusterId = htonl(pConnect->clusterId);

65 66 67 68 69
  assert(pConnect->epSet.numOfEps > 0);
  for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
    pConnect->epSet.port[i] = htons(pConnect->epSet.port[i]);
  }

H
Haojun Liao 已提交
70 71 72
  // TODO refactor
  pthread_mutex_lock(&pTscObj->mutex);
  char temp[TSDB_TABLE_FNAME_LEN * 2] = {0};
H
Haojun Liao 已提交
73
  int32_t len = sprintf(temp, "%d%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pTscObj->db);
H
Haojun Liao 已提交
74 75 76 77 78 79 80 81 82 83

  assert(len <= sizeof(pTscObj->db));
  tstrncpy(pTscObj->db, temp, sizeof(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);

  if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
  }

  for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
84
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.fqdn[i], pConnect->epSet.port[i], pTscObj->id);
H
Haojun Liao 已提交
85 86 87
  }

  pTscObj->connId = pConnect->connId;
H
Haojun Liao 已提交
88
  pTscObj->acctId = pConnect->acctId;
H
Haojun Liao 已提交
89 90 91 92 93

  // update the appInstInfo
  pTscObj->pAppInfo->clusterId = pConnect->clusterId;
  atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);

94
  pRequest->body.resInfo.pRspMsg = pMsg;
H
Haojun Liao 已提交
95
  tscDebug("0x%" PRIx64 " clusterId:%d, totalConn:%"PRId64, pRequest->requestId, pConnect->clusterId, pTscObj->pAppInfo->numOfConns);
96 97
  return 0;
}
H
Haojun Liao 已提交
98

H
Haojun Liao 已提交
99
int32_t doBuildMsgSupp(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
100
  pMsgBody->requestObjRefId = pRequest->self;
101
  pMsgBody->msgInfo     = pRequest->body.requestMsg;
H
Haojun Liao 已提交
102 103 104 105 106

  switch(pRequest->type) {
    case TSDB_SQL_CREATE_USER:
      pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_USER;
      break;
H
Haojun Liao 已提交
107 108 109 110 111 112 113 114 115
    case TSDB_SQL_DROP_USER:
      pMsgBody->msgType = TSDB_MSG_TYPE_DROP_USER;
      break;
    case TSDB_SQL_CREATE_ACCT:
      pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_ACCT;
      break;
    case TSDB_SQL_DROP_ACCT:
      pMsgBody->msgType = TSDB_MSG_TYPE_DROP_ACCT;
      break;
H
Haojun Liao 已提交
116 117 118
    case TSDB_SQL_CREATE_DB: {
      pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_DB;

119
      SCreateDbMsg* pCreateMsg = pRequest->body.requestMsg.pMsg;
H
Haojun Liao 已提交
120 121 122 123 124 125 126
      SName name = {0};
      int32_t ret = tNameSetDbName(&name, pRequest->pTscObj->acctId, pCreateMsg->db, strnlen(pCreateMsg->db, tListLen(pCreateMsg->db)));
      if (ret != TSDB_CODE_SUCCESS) {
        return -1;
      }

      tNameGetFullDbName(&name, pCreateMsg->db);
127 128 129 130 131 132 133 134
      break;
    }
    case TSDB_SQL_USE_DB: {
      pMsgBody->msgType = TSDB_MSG_TYPE_USE_DB;
      break;
    }
    case TSDB_SQL_CREATE_TABLE: {
      pMsgBody->msgType = TSDB_MSG_TYPE_CREATE_STB;
H
Haojun Liao 已提交
135 136 137 138 139 140
      break;
    }
    case TSDB_SQL_SHOW:
      pMsgBody->msgType = TSDB_MSG_TYPE_SHOW;
      break;
  }
141 142 143 144 145 146 147 148 149 150 151 152
}

int32_t processShowRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
  SShowRsp* pShow = (SShowRsp *)pMsg;
  pShow->showId   = htonl(pShow->showId);

  STableMetaMsg *pMetaMsg = &(pShow->tableMeta);
  pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);

  SSchema* pSchema = pMetaMsg->pSchema;
  pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
  for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
H
Haojun Liao 已提交
153
    pSchema->bytes = htonl(pSchema->bytes);
154 155 156
    pSchema++;
  }

H
Haojun Liao 已提交
157 158 159 160 161 162 163
  pSchema = pMetaMsg->pSchema;
  TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
  for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
    tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
    pFields[i].type  = pSchema[i].type;
    pFields[i].bytes = pSchema[i].bytes;
  }
164

165 166
  pRequest->body.resInfo.pRspMsg = pMsg;
  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
167

H
Haojun Liao 已提交
168 169 170 171 172 173 174
  pResInfo->fields    = pFields;
  pResInfo->numOfCols = pMetaMsg->numOfColumns;
  pResInfo->row       = calloc(pResInfo->numOfCols, POINTER_BYTES);
  pResInfo->pCol      = calloc(pResInfo->numOfCols, POINTER_BYTES);
  pResInfo->length    = calloc(pResInfo->numOfCols, sizeof(int32_t));

  pRequest->body.execId = pShow->showId;
H
Haojun Liao 已提交
175 176 177
  return 0;
}

H
Haojun Liao 已提交
178 179
int buildRetrieveMnodeMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
  pMsgBody->msgType = TSDB_MSG_TYPE_SHOW_RETRIEVE;
180
  pMsgBody->msgInfo.len = sizeof(SRetrieveTableMsg);
H
Haojun Liao 已提交
181 182 183 184 185
  pMsgBody->requestObjRefId = pRequest->self;

  SRetrieveTableMsg *pRetrieveMsg = calloc(1, sizeof(SRetrieveTableMsg));
  pRetrieveMsg->showId  = htonl(pRequest->body.execId);

186
  pMsgBody->msgInfo.pMsg = pRetrieveMsg;
H
Haojun Liao 已提交
187 188 189 190 191 192
  return TSDB_CODE_SUCCESS;
}

int32_t processRetrieveMnodeRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
  assert(msgLen >= sizeof(SRetrieveTableRsp));

193 194
  tfree(pRequest->body.resInfo.pRspMsg);
  pRequest->body.resInfo.pRspMsg = pMsg;
H
Haojun Liao 已提交
195 196 197 198 199

  SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *) pMsg;
  pRetrieve->numOfRows  = htonl(pRetrieve->numOfRows);
  pRetrieve->precision  = htons(pRetrieve->precision);

200
  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
201 202 203 204 205 206 207 208 209 210 211
  pResInfo->numOfRows = pRetrieve->numOfRows;
  pResInfo->pData = pRetrieve->data;              // todo fix this in async model

  pResInfo->current = 0;
  setResultDataPtr(pResInfo, pResInfo->fields, pResInfo->numOfCols, pResInfo->numOfRows);

  tscDebug("0x%"PRIx64" numOfRows:%d, complete:%d, qId:0x%"PRIx64, pRequest->self, pRetrieve->numOfRows,
           pRetrieve->completed, pRequest->body.execId);
  return 0;
}

H
Haojun Liao 已提交
212 213 214
int32_t processCreateDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
  // todo rsp with the vnode id list
}
H
Haojun Liao 已提交
215

216 217 218 219 220 221 222 223 224 225 226 227 228 229
int32_t processUseDbRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
  SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg;
  SName name = {0};
  tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);

  char db[TSDB_DB_NAME_LEN] = {0};
  tNameGetDbName(&name, db);
  setConnectionDB(pRequest->pTscObj, db);
}

int32_t processCreateTableRsp(SRequestObj *pRequest, const char* pMsg, int32_t msgLen) {
  assert(pMsg != NULL);
}

H
Haojun Liao 已提交
230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254
void initMsgHandleFp() {
#if 0
  tscBuildMsg[TSDB_SQL_SELECT] = tscBuildQueryMsg;
  tscBuildMsg[TSDB_SQL_INSERT] = tscBuildSubmitMsg;
  tscBuildMsg[TSDB_SQL_FETCH] = tscBuildFetchMsg;

  tscBuildMsg[TSDB_SQL_CREATE_DB] = tscBuildCreateDbMsg;
  tscBuildMsg[TSDB_SQL_CREATE_USER] = tscBuildUserMsg;
  tscBuildMsg[TSDB_SQL_CREATE_FUNCTION] = tscBuildCreateFuncMsg;

  tscBuildMsg[TSDB_SQL_CREATE_ACCT] = tscBuildAcctMsg;
  tscBuildMsg[TSDB_SQL_ALTER_ACCT] = tscBuildAcctMsg;

  tscBuildMsg[TSDB_SQL_CREATE_TABLE] = tscBuildCreateTableMsg;
  tscBuildMsg[TSDB_SQL_DROP_USER] = tscBuildDropUserAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_ACCT] = tscBuildDropUserAcctMsg;
  tscBuildMsg[TSDB_SQL_DROP_DB] = tscBuildDropDbMsg;
  tscBuildMsg[TSDB_SQL_DROP_FUNCTION] = tscBuildDropFuncMsg;
  tscBuildMsg[TSDB_SQL_SYNC_DB_REPLICA] = tscBuildSyncDbReplicaMsg;
  tscBuildMsg[TSDB_SQL_DROP_TABLE] = tscBuildDropTableMsg;
  tscBuildMsg[TSDB_SQL_ALTER_USER] = tscBuildUserMsg;
  tscBuildMsg[TSDB_SQL_CREATE_DNODE] = tscBuildCreateDnodeMsg;
  tscBuildMsg[TSDB_SQL_DROP_DNODE] = tscBuildDropDnodeMsg;
  tscBuildMsg[TSDB_SQL_CFG_DNODE] = tscBuildCfgDnodeMsg;
  tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
H
Haojun Liao 已提交
255
  tscBuildMsg[TSDB_SQL_UPDATE_TAG_VAL] = tscBuildUpdateTagMsg;
H
Haojun Liao 已提交
256 257 258 259 260 261 262 263 264 265
  tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
  tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;


  tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
  tscBuildMsg[TSDB_SQL_STABLEVGROUP] = tscBuildSTableVgroupMsg;
  tscBuildMsg[TSDB_SQL_RETRIEVE_FUNC] = tscBuildRetrieveFuncMsg;

  tscBuildMsg[TSDB_SQL_HB] = tscBuildHeartBeatMsg;
  tscBuildMsg[TSDB_SQL_SHOW] = tscBuildShowMsg;
H
Haojun Liao 已提交
266
  tscBuildMsg[TSDB_SQL_RETRIEVE_MNODE] = tscBuildRetrieveFromMgmtMsg;
H
Haojun Liao 已提交
267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283
  tscBuildMsg[TSDB_SQL_KILL_QUERY] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_STREAM] = tscBuildKillMsg;
  tscBuildMsg[TSDB_SQL_KILL_CONNECTION] = tscBuildKillMsg;

  tscProcessMsgRsp[TSDB_SQL_SELECT] = tscProcessQueryRsp;
  tscProcessMsgRsp[TSDB_SQL_FETCH] = tscProcessRetrieveRspFromNode;

  tscProcessMsgRsp[TSDB_SQL_DROP_DB] = tscProcessDropDbRsp;
  tscProcessMsgRsp[TSDB_SQL_DROP_TABLE] = tscProcessDropTableRsp;

  tscProcessMsgRsp[TSDB_SQL_USE_DB] = tscProcessUseDbRsp;
  tscProcessMsgRsp[TSDB_SQL_META] = tscProcessTableMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_STABLEVGROUP] = tscProcessSTableVgroupRsp;
  tscProcessMsgRsp[TSDB_SQL_MULTI_META] = tscProcessMultiTableMetaRsp;
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_FUNC] = tscProcessRetrieveFuncRsp;

  tscProcessMsgRsp[TSDB_SQL_SHOW] = tscProcessShowRsp;
H
Haojun Liao 已提交
284
  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_MNODE] = tscProcessRetrieveRspFromNode;  // rsp handled by same function.
H
Haojun Liao 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305
  tscProcessMsgRsp[TSDB_SQL_DESCRIBE_TABLE] = tscProcessDescribeTableRsp;

  tscProcessMsgRsp[TSDB_SQL_CURRENT_DB]   = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CURRENT_USER] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_VERSION] = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_CLI_VERSION]  = tscProcessLocalRetrieveRsp;
  tscProcessMsgRsp[TSDB_SQL_SERV_STATUS]  = tscProcessLocalRetrieveRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;

  tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;

  tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
  tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;

  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;
  tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_DATABASE] = tscProcessShowCreateRsp;
#endif

306
  buildRequestMsgFp[TSDB_SQL_CONNECT]  = buildConnectMsg;
H
Haojun Liao 已提交
307
  handleRequestRspFp[TSDB_SQL_CONNECT] = processConnectRsp;
308

H
Haojun Liao 已提交
309
  buildRequestMsgFp[TSDB_SQL_CREATE_USER]  = doBuildMsgSupp;
H
Haojun Liao 已提交
310 311 312 313
  buildRequestMsgFp[TSDB_SQL_DROP_USER]    = doBuildMsgSupp;

  buildRequestMsgFp[TSDB_SQL_CREATE_ACCT]  = doBuildMsgSupp;
  buildRequestMsgFp[TSDB_SQL_DROP_ACCT]    = doBuildMsgSupp;
314

H
Haojun Liao 已提交
315
  buildRequestMsgFp[TSDB_SQL_SHOW]         = doBuildMsgSupp;
316
  handleRequestRspFp[TSDB_SQL_SHOW]        = processShowRsp;
317

H
Haojun Liao 已提交
318 319
  buildRequestMsgFp[TSDB_SQL_RETRIEVE_MNODE] = buildRetrieveMnodeMsg;
  handleRequestRspFp[TSDB_SQL_RETRIEVE_MNODE]= processRetrieveMnodeRsp;
H
Haojun Liao 已提交
320 321 322

  buildRequestMsgFp[TSDB_SQL_CREATE_DB]      = doBuildMsgSupp;
  handleRequestRspFp[TSDB_SQL_CREATE_DB]     = processCreateDbRsp;
323 324 325 326 327 328

  buildRequestMsgFp[TSDB_SQL_USE_DB]         = doBuildMsgSupp;
  handleRequestRspFp[TSDB_SQL_USE_DB]        = processUseDbRsp;

  buildRequestMsgFp[TSDB_SQL_CREATE_TABLE]   = doBuildMsgSupp;
  handleRequestRspFp[TSDB_SQL_CREATE_TABLE]  = processCreateTableRsp;
H
Haojun Liao 已提交
329
}