clientImpl.c 18.2 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5
#include "tdef.h"
#include "tep.h"
6
#include "tglobal.h"
7
#include "tmsgtype.h"
8 9
#include "tnote.h"
#include "tpagedfile.h"
10
#include "tref.h"
11
#include "parser.h"
X
Xiaoyu Wang 已提交
12 13 14 15
#include "planner.h"
#include "scheduler.h"

#define CHECK_CODE_GOTO(expr, lable) \
H
Haojun Liao 已提交
16 17
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
18
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
19 20 21
      terrno = code;                 \
      goto lable;                    \
    }                                \
X
Xiaoyu Wang 已提交
22
  } while (0)
23

24
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
25 26
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
static bool stringLengthCheck(const char* str, size_t maxsize) {
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
46
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
47 48 49 50 51 52
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

53 54 55 56 57 58 59
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
60 61

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
62 63 64 65
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

  char tmp[TSDB_DB_NAME_LEN] = {0};
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

    tstrncpy(tmp, db, sizeof(tmp));
    strdequote(tmp);
  }

82
  char secretEncrypt[32] = {0};
83 84 85 86 87 88
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

89
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
90 91 92 93
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

94
  SCorEpSet epSet = {0};
95 96 97 98 99 100 101 102 103 104 105 106 107 108
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

109 110
  char* key = getClusterKey(user, secretEncrypt, ip, port);

H
Haojun Liao 已提交
111
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
112
  if (pInst == NULL) {
H
Haojun Liao 已提交
113 114
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
115
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
H
Haojun Liao 已提交
116
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
117

H
Haojun Liao 已提交
118
    pInst = &p;
119 120
  }

H
Haojun Liao 已提交
121 122
  tfree(key);
  return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
123 124
}

X
Xiaoyu Wang 已提交
125 126 127
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
128
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
129
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
130 131
  }

X
Xiaoyu Wang 已提交
132 133 134 135 136
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
137 138
  }

X
Xiaoyu Wang 已提交
139 140 141
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
142

H
Haojun Liao 已提交
143
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
144 145
  return TSDB_CODE_SUCCESS;
}
146

X
Xiaoyu Wang 已提交
147
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
148 149
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
150
  SParseContext cxt = {
H
Haojun Liao 已提交
151
    .ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj), .pTransporter = pTscObj->pTransporter},
152
    .pSql   = pRequest->sqlstr,
X
Xiaoyu Wang 已提交
153
    .sqlLen = pRequest->sqlLen,
154
    .pMsg   = pRequest->msgBuf,
X
Xiaoyu Wang 已提交
155 156
    .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
  };
H
Haojun Liao 已提交
157

H
Haojun Liao 已提交
158
  cxt.ctx.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
159
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.ctx.pCatalog);
H
Haojun Liao 已提交
160 161 162 163 164 165
  if (code != TSDB_CODE_SUCCESS) {
    tfree(cxt.ctx.db);
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
166

X
Xiaoyu Wang 已提交
167 168 169
  tfree(cxt.ctx.db);
  return code;
}
H
Haojun Liao 已提交
170

X
Xiaoyu Wang 已提交
171 172 173
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
174
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
175

H
Haojun Liao 已提交
176
  STscObj* pTscObj = pRequest->pTscObj;
177
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
178

179
  int64_t transporterId = 0;
180 181 182 183 184 185 186 187
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
        pShowReqInfo->currentIndex = 0;
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
188
    asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
189
  } else {
190
    SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
191
    asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
192 193
  }

X
Xiaoyu Wang 已提交
194
  tsem_wait(&pRequest->body.rspSem);
195
  destroySendMsgInfo(pSendMsg);
X
Xiaoyu Wang 已提交
196 197
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
198

X
Xiaoyu Wang 已提交
199 200
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQuery, SQueryDag** pDag) {
  pRequest->type = pQuery->type;
201
  return qCreateQueryDag(pQuery, pDag, pRequest->requestId);
X
Xiaoyu Wang 已提交
202 203
}

X
Xiaoyu Wang 已提交
204
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) {
205
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
206 207 208 209
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
    int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res);
    pRequest->affectedRows = res.numOfRows;
    return res.code;
X
Xiaoyu Wang 已提交
210
  }
211

212
  return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob);
X
Xiaoyu Wang 已提交
213
}
X
Xiaoyu Wang 已提交
214

L
Liu Jicong 已提交
215 216 217 218 219 220 221 222 223 224
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQueryNode*  pQuery = NULL;
  SQueryDag*   pDag = NULL;
  char *dagStr = NULL;

  terrno = TSDB_CODE_SUCCESS;

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
L
Liu Jicong 已提交
225 226 227

//temporary disabled until planner ready
#if 0
L
Liu Jicong 已提交
228 229 230 231 232 233 234 235 236
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
  //TODO: check sql valid

  CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);

  dagStr = qDagToString(pDag);
  if(dagStr == NULL) {
    //TODO
  }
L
Liu Jicong 已提交
237
#endif
L
Liu Jicong 已提交
238 239 240 241

  SCMCreateTopicReq req = {
    .name = (char*)name,
    .igExists = 0,
L
Liu Jicong 已提交
242 243 244
    /*.physicalPlan = dagStr,*/
    .physicalPlan = (char*)sql,
    .logicalPlan = "",
L
Liu Jicong 已提交
245 246
  };

L
Liu Jicong 已提交
247 248 249 250 251 252 253 254
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
255 256 257

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };

258
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
  SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

_return:
  qDestroyQuery(pQuery);
  qDestroyQueryDag(pDag); 
  destroySendMsgInfo(body);
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
  return pRequest;
}

X
Xiaoyu Wang 已提交
276 277 278 279 280 281
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
282 283
  }

X
Xiaoyu Wang 已提交
284 285 286 287 288 289 290
  nPrintTsc("%s", sql)

  SRequestObj* pRequest = NULL;
  SQueryNode* pQuery = NULL;
  SQueryDag* pDag = NULL;
  void* pJob = NULL;

X
Xiaoyu Wang 已提交
291
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
292 293
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
H
Haojun Liao 已提交
294

X
Xiaoyu Wang 已提交
295 296
  if (qIsDdlQuery(pQuery)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
297
  } else {
X
Xiaoyu Wang 已提交
298
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
H
Haojun Liao 已提交
299
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return);
X
Xiaoyu Wang 已提交
300 301 302
  }

_return:
L
Liu Jicong 已提交
303
  qDestroyQuery(pQuery);
X
Xiaoyu Wang 已提交
304
  qDestroyQueryDag(pDag);
X
Xiaoyu Wang 已提交
305
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
306 307
    pRequest->code = terrno;
  }
308 309 310
  return pRequest;
}

311
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
312 313
  pEpSet->version = 0;

H
Haojun Liao 已提交
314
  // init mnode ip set
315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

347
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
348
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
349 350 351 352 353
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
354
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
355 356 357
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
358 359 360
    return NULL;
  }

361
  SMsgSendInfo* body = buildConnectMsg(pRequest);
362 363

  int64_t transporterId = 0;
H
Haojun Liao 已提交
364
  asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
365 366

  tsem_wait(&pRequest->body.rspSem);
367
  destroySendMsgInfo(body);
H
Haojun Liao 已提交
368

369 370 371 372 373 374 375 376
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
377
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
378 379 380 381 382 383
    destroyRequest(pRequest);
  }

  return pTscObj;
}

384 385 386 387 388 389 390
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
391
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
392 393 394
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectMsg);
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
395
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
396
  pMsgSendInfo->param           = pRequest;
397 398 399

  SConnectMsg *pConnect = calloc(1, sizeof(SConnectMsg));
  if (pConnect == NULL) {
400
    tfree(pMsgSendInfo);
401
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
402
    return NULL;
403 404
  }

405 406
  STscObj *pObj = pRequest->pTscObj;

407
  char* db = getConnectionDB(pObj);
408
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
409
  tfree(db);
410

411 412 413
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
414

415 416
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
417 418
}

419
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
420
  assert(pMsgBody != NULL);
421 422
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
423 424
}

425
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
426 427
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
428

429
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
430
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
431
    assert(pRequest->self == pSendInfo->requestObjRefId);
432

433 434
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
435

436 437 438 439 440
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
441 442
    }

443
    /*
444 445
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
446
     */
447
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
448
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
449
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
450
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
451
    } else {
H
Haojun Liao 已提交
452
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
453
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
454
    }
455

H
Haojun Liao 已提交
456
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
457 458
  }

459 460 461 462 463 464 465 466 467
  SDataBuf buf = {.len = pMsg->contLen};
  buf.pData = calloc(1, pMsg->contLen);
  if (buf.pData == NULL) {
    terrno     = TSDB_CODE_OUT_OF_MEMORY;
    pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
  } else {
    memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
  }

468
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
469 470
  rpcFreeCont(pMsg->pCont);
}
471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
497 498 499 500
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
501
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
502

H
Haojun Liao 已提交
503
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
504 505 506 507
    if (pRequest->type == TDMT_MND_SHOW) {
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
      asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);

      tsem_wait(&pRequest->body.rspSem);
      destroySendMsgInfo(body);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
533
    }
H
Haojun Liao 已提交
534

535
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
536

537 538
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
539
    asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
H
Haojun Liao 已提交
540 541

    tsem_wait(&pRequest->body.rspSem);
542
    destroySendMsgInfo(body);
H
Haojun Liao 已提交
543 544 545 546 547 548 549 550 551 552 553 554

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
555
    }
H
Haojun Liao 已提交
556 557 558 559 560 561
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

562
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
563 564 565 566 567 568 569 570
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
571
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
572 573
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
574
  }
S
Shengliang Guan 已提交
575 576
}

577 578 579 580 581
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
  p = strndup(pObj->db, tListLen(pObj->db));
  pthread_mutex_unlock(&pObj->mutex);
S
Shengliang Guan 已提交
582

583 584 585 586 587 588 589 590 591
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
592