clientImpl.c 14.9 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5
#include "tdef.h"
#include "tep.h"
6
#include "tglobal.h"
7
#include "tmsgtype.h"
8 9
#include "tnote.h"
#include "tpagedfile.h"
10
#include "tref.h"
11
#include "parser.h"
12

13 14
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody);
H
Haojun Liao 已提交
15
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody);
H
Haojun Liao 已提交
16

17
static int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId);
18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36

static bool stringLengthCheck(const char* str, size_t maxsize) {
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
37
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
38 39 40 41 42 43
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

44 45 46 47 48 49 50
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
51 52

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
53 54 55 56
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

  char tmp[TSDB_DB_NAME_LEN] = {0};
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

    tstrncpy(tmp, db, sizeof(tmp));
    strdequote(tmp);
  }

73
  char secretEncrypt[32] = {0};
74 75 76 77 78 79
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

80
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
81 82 83 84
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

85
  SCorEpSet epSet = {0};
86 87 88 89 90 91 92 93 94 95 96 97 98 99
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

100 101
  char* key = getClusterKey(user, secretEncrypt, ip, port);

H
Haojun Liao 已提交
102
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
103
  if (pInst == NULL) {
H
Haojun Liao 已提交
104 105
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
106
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
H
Haojun Liao 已提交
107
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
108

H
Haojun Liao 已提交
109
    pInst = &p;
110 111
  }

H
Haojun Liao 已提交
112 113
  tfree(key);
  return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
114 115
}

H
Haojun Liao 已提交
116 117 118 119 120 121 122
static bool supportedQueryType(int32_t type) {
  return (type == TSDB_MSG_TYPE_CREATE_USER || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_DROP_USER ||
          type == TSDB_MSG_TYPE_DROP_ACCT || type == TSDB_MSG_TYPE_CREATE_DB || type == TSDB_MSG_TYPE_CREATE_ACCT ||
          type == TSDB_MSG_TYPE_CREATE_TABLE || type == TSDB_MSG_TYPE_CREATE_STB || type == TSDB_MSG_TYPE_USE_DB ||
          type == TSDB_MSG_TYPE_DROP_DB || type == TSDB_MSG_TYPE_DROP_STB);
}

H
Haojun Liao 已提交
123
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
124
  STscObj *pTscObj = (STscObj *)taos;
125 126 127 128 129 130 131 132
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  nPrintTsc("%s", sql)

133
  SRequestObj* pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153
  if (pRequest == NULL) {
    tscError("failed to malloc sqlObj");
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

  pRequest->sqlstr = malloc(sqlLen + 1);
  if (pRequest->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", pRequest->self);

    pRequest->msgBuf = strdup("failed to prepare sql string buffer");
    terrno = pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pRequest;
  }

  strntolower(pRequest->sqlstr, sql, (int32_t)sqlLen);
  pRequest->sqlstr[sqlLen] = 0;

  tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr);

X
Xiaoyu Wang 已提交
154
  SParseContext cxt = {
H
Haojun Liao 已提交
155 156
    .ctx    = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)},
    .pSql   = pRequest->sqlstr,
X
Xiaoyu Wang 已提交
157
    .sqlLen = sqlLen,
H
Haojun Liao 已提交
158
    .pMsg   = pRequest->msgBuf,
X
Xiaoyu Wang 已提交
159 160
    .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
  };
H
Haojun Liao 已提交
161

X
Xiaoyu Wang 已提交
162 163 164
  SQueryNode* pQuery = NULL;
  int32_t code = qParseQuerySql(&cxt, &pQuery);
  if (qIsDclQuery(pQuery)) {
H
Haojun Liao 已提交
165
    SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery;
X
Xiaoyu Wang 已提交
166
    pRequest->type = pDcl->msgType;
X
Xiaoyu Wang 已提交
167 168
    pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};

X
Xiaoyu Wang 已提交
169 170 171
    SRequestMsgBody body = buildRequestMsgImpl(pRequest);
    SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;

X
Xiaoyu Wang 已提交
172
    if (pDcl->msgType == TSDB_MSG_TYPE_CREATE_TABLE) {
X
Xiaoyu Wang 已提交
173 174 175 176 177 178 179 180 181
      struct SCatalog* pCatalog = NULL;

      char buf[12] = {0};
      sprintf(buf, "%d", pTscObj->pAppInfo->clusterId);
      code = catalogGetHandle(buf, &pCatalog);
      if (code != 0) {
        pRequest->code = code;
        return pRequest;
      }
X
Xiaoyu Wang 已提交
182

X
Xiaoyu Wang 已提交
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200
      SCreateTableMsg* pMsg = body.msgInfo.pMsg;

      SName t = {0};
      tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);

      char db[TSDB_DB_NAME_LEN + TS_PATH_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
      tNameGetFullDbName(&t, db);

      SVgroupInfo info = {0};
      catalogGetTableHashVgroup(pCatalog, pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);

      int64_t transporterId = 0;
      SEpSet ep = {0};
      ep.inUse = info.inUse;
      ep.numOfEps = info.numOfEps;
      for(int32_t i = 0; i < ep.numOfEps; ++i) {
        ep.port[i] = info.epAddr[i].port;
        tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
201
      }
202

X
Xiaoyu Wang 已提交
203
      sendMsgToServer(pTscObj->pTransporter, &ep, &body, &transporterId);
204
    } else {
X
Xiaoyu Wang 已提交
205 206
      int64_t transporterId = 0;
      sendMsgToServer(pTscObj->pTransporter, pEpSet, &body, &transporterId);
207
    }
X
Xiaoyu Wang 已提交
208 209 210

    tsem_wait(&pRequest->body.rspSem);
    destroyRequestMsgBody(&body);
211 212
  }

X
Xiaoyu Wang 已提交
213 214
  tfree(cxt.ctx.db);

215 216
  if (code != TSDB_CODE_SUCCESS) {
    pRequest->code = code;
217
    return pRequest;
218 219 220 221 222
  }

  return pRequest;
}

223
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
224 225
  pEpSet->version = 0;

H
Haojun Liao 已提交
226
  // init mnode ip set
227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

259
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
260
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
261 262 263 264 265
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

266
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TSDB_MSG_TYPE_CONNECT);
267 268 269
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
270 271 272
    return NULL;
  }

273 274 275 276 277 278 279
  SRequestMsgBody body = {0};
  buildConnectMsg(pRequest, &body);

  int64_t transporterId = 0;
  sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);

  tsem_wait(&pRequest->body.rspSem);
H
Haojun Liao 已提交
280
  destroyRequestMsgBody(&body);
H
Haojun Liao 已提交
281

282 283 284 285 286 287 288 289
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
290
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p", pTscObj->id, pTscObj->connId, pTscObj->pTransporter);
291 292 293 294 295 296 297 298
    destroyRequest(pRequest);
  }

  return pTscObj;
}

static int32_t buildConnectMsg(SRequestObj *pRequest, SRequestMsgBody* pMsgBody) {
  pMsgBody->msgType         = TSDB_MSG_TYPE_CONNECT;
299
  pMsgBody->msgInfo.len     = sizeof(SConnectMsg);
300 301 302 303
  pMsgBody->requestObjRefId = pRequest->self;

  SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
  if (pConnect == NULL) {
304
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
305
    return -1;
306 307
  }

308 309
  STscObj *pObj = pRequest->pTscObj;

310
  char* db = getConnectionDB(pObj);
311
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
312
  tfree(db);
313

314 315 316
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
317

318
  pMsgBody->msgInfo.pMsg = pConnect;
319 320 321
  return 0;
}

H
Haojun Liao 已提交
322
static void destroyRequestMsgBody(SRequestMsgBody* pMsgBody) {
H
Haojun Liao 已提交
323
  assert(pMsgBody != NULL);
324
  tfree(pMsgBody->msgInfo.pMsg);
H
Haojun Liao 已提交
325 326
}

327
int32_t sendMsgToServer(void *pTransporter, SEpSet* epSet, const SRequestMsgBody *pBody, int64_t* pTransporterId) {
328
  char *pMsg = rpcMallocCont(pBody->msgInfo.len);
329 330
  if (NULL == pMsg) {
    tscError("0x%"PRIx64" msg:%s malloc failed", pBody->requestId, taosMsg[pBody->msgType]);
331
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
332
    return -1;
333 334
  }

335
  memcpy(pMsg, pBody->msgInfo.pMsg, pBody->msgInfo.len);
336 337 338
  SRpcMsg rpcMsg = {
      .msgType = pBody->msgType,
      .pCont   = pMsg,
339
      .contLen = pBody->msgInfo.len,
340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356
      .ahandle = (void*) pBody->requestObjRefId,
      .handle  = NULL,
      .code    = 0
  };

  rpcSendRequest(pTransporter, epSet, &rpcMsg, pTransporterId);
  return TSDB_CODE_SUCCESS;
}

void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
  int64_t requestRefId = (int64_t)pMsg->ahandle;

  SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(tscReqRef, requestRefId);
  if (pRequest == NULL) {
    rpcFreeCont(pMsg->pCont);
    return;
  }
357

358 359
  assert(pRequest->self == requestRefId);
  pRequest->metric.rsp = taosGetTimestampMs();
360

361 362 363 364 365 366 367 368 369 370 371 372 373 374
  pRequest->code = pMsg->code;

  STscObj *pTscObj = pRequest->pTscObj;
  if (pEpSet) {
    if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
    }
  }

  /*
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
   */
  if (pMsg->code == TSDB_CODE_SUCCESS) {
375 376
    tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%"PRId64 " ms", pRequest->requestId, taosMsg[pMsg->msgType],
             tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
377
    if (handleRequestRspFp[pRequest->type]) {
H
Haojun Liao 已提交
378 379 380 381 382 383 384 385
      char *p = malloc(pMsg->contLen);
      if (p == NULL) {
        pRequest->code = TSDB_CODE_TSC_OUT_OF_MEMORY;
        terrno = pRequest->code;
      } else {
        memcpy(p, pMsg->pCont, pMsg->contLen);
        pMsg->code = (*handleRequestRspFp[pRequest->type])(pRequest, p, pMsg->contLen);
      }
386 387
    }
  } else {
388 389
    tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%"PRId64" ms", pRequest->requestId, taosMsg[pMsg->msgType],
             tstrerror(pMsg->code), pMsg->contLen, pRequest->metric.rsp - pRequest->metric.start);
390 391
  }

H
Haojun Liao 已提交
392
  taosReleaseRef(tscReqRef, requestRefId);
393
  rpcFreeCont(pMsg->pCont);
H
Haojun Liao 已提交
394 395

  sem_post(&pRequest->body.rspSem);
396
}
397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
423 424 425 426
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
427
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
428

H
Haojun Liao 已提交
429
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
430
    pRequest->type = TSDB_MSG_TYPE_SHOW_RETRIEVE;
H
Haojun Liao 已提交
431

432
    SRequestMsgBody body = buildRequestMsgImpl(pRequest);
H
Haojun Liao 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451

    int64_t transporterId = 0;
    STscObj* pTscObj = pRequest->pTscObj;
    sendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &body, &transporterId);

    tsem_wait(&pRequest->body.rspSem);
    destroyRequestMsgBody(&body);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
452
    }
H
Haojun Liao 已提交
453 454 455 456 457 458
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

459
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
460 461 462 463 464 465 466 467
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
468
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
469 470
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
471
  }
S
Shengliang Guan 已提交
472 473
}

474 475 476 477 478
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
  p = strndup(pObj->db, tListLen(pObj->db));
  pthread_mutex_unlock(&pObj->mutex);
S
Shengliang Guan 已提交
479

480 481 482 483 484 485 486 487 488
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
489