clientImpl.c 20.2 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5 6
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
7 8
#include "tdef.h"
#include "tep.h"
9
#include "tglobal.h"
10
#include "tmsgtype.h"
11 12
#include "tnote.h"
#include "tpagedfile.h"
13
#include "tref.h"
X
Xiaoyu Wang 已提交
14

15
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
16 17
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
18
static void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
19

20
static bool stringLengthCheck(const char* str, size_t maxsize) {
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
38
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
39 40 41 42 43 44
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

45 46 47 48 49 50
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

H
Haojun Liao 已提交
51
static STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
52
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
53 54

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
55 56 57 58
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

59 60 61 62 63
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

64
  char localDb[TSDB_DB_NAME_LEN] = {0};
65 66 67 68 69 70
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

71 72
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
73 74
  }

75
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
76 77 78 79 80 81
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

82
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
83 84 85 86
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

87
  SCorEpSet epSet = {0};
88 89 90 91 92 93
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
H
Haojun Liao 已提交
94
      epSet.epSet.eps[0].port = port;
95 96 97 98 99 100 101
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

102
  char* key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
103
  SAppInstInfo** pInst = NULL;
104

H
Haojun Liao 已提交
105 106 107
  pthread_mutex_lock(&appInfo.mutex);

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
108
  if (pInst == NULL) {
H
Haojun Liao 已提交
109 110
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
111
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
L
Liu Jicong 已提交
112
    /*p->pAppHbMgr = appHbMgrInit(p, key);*/
H
Haojun Liao 已提交
113
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
114

H
Haojun Liao 已提交
115
    pInst = &p;
116 117
  }

H
Haojun Liao 已提交
118 119
  pthread_mutex_unlock(&appInfo.mutex);

H
Haojun Liao 已提交
120
  tfree(key);
H
Haojun Liao 已提交
121
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
122 123
}

X
Xiaoyu Wang 已提交
124 125 126
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
127
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
128
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
129 130
  }

X
Xiaoyu Wang 已提交
131 132 133 134 135
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
136 137
  }

X
Xiaoyu Wang 已提交
138 139 140
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
141

H
Haojun Liao 已提交
142
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
143 144
  return TSDB_CODE_SUCCESS;
}
145

X
Xiaoyu Wang 已提交
146
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
147 148
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
149
  SParseContext cxt = {
H
Haojun Liao 已提交
150
    .requestId = pRequest->requestId,
H
Haojun Liao 已提交
151
    .acctId    = pTscObj->acctId,
H
Haojun Liao 已提交
152
    .db        = getDbOfConnection(pTscObj),
H
Haojun Liao 已提交
153 154 155 156
    .pSql      = pRequest->sqlstr,
    .sqlLen    = pRequest->sqlLen,
    .pMsg      = pRequest->msgBuf,
    .msgLen    = ERROR_MSG_BUF_DEFAULT_SIZE,
H
Haojun Liao 已提交
157
    .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
158
  };
H
Haojun Liao 已提交
159

H
Haojun Liao 已提交
160 161
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
162
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
163
    tfree(cxt.db);
H
Haojun Liao 已提交
164 165 166 167
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
168

H
Haojun Liao 已提交
169
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
170 171
  return code;
}
H
Haojun Liao 已提交
172

X
Xiaoyu Wang 已提交
173 174 175
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
176
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
177

H
Haojun Liao 已提交
178
  STscObj* pTscObj = pRequest->pTscObj;
179
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
180

181
  int64_t transporterId = 0;
182 183 184 185
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
186
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
187 188 189
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
H
Haojun Liao 已提交
190
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
191
  } else {
H
Haojun Liao 已提交
192
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
193 194
  }

X
Xiaoyu Wang 已提交
195 196 197
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
198

199
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag, SArray* pNodeList) {
200
  pRequest->type = pQueryNode->type;
201

202 203 204
  SSchema* pSchema = NULL;
  int32_t  numOfCols = 0;
  int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &numOfCols, pNodeList, pRequest->requestId);
205 206 207 208 209
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
210
    setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
211
    pRequest->type = TDMT_VND_QUERY;
212 213
  }

214
  tfree(pSchema);
215
  return code;
X
Xiaoyu Wang 已提交
216 217
}

218 219
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
220

221 222
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
223 224

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
225 226 227
    pResInfo->fields[i].bytes = pSchema[i].bytes;
    pResInfo->fields[i].type  = pSchema[i].type;
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
228
  }
X
Xiaoyu Wang 已提交
229 230
}

231
int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList) {
232
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
233
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
234
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
235
    int32_t code = schedulerExecJob(pTransporter, NULL, pDag, &pRequest->body.pQueryJob, pRequest->sqlstr, &res);
236 237 238
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
239
      if (pRequest->body.pQueryJob != NULL) {
D
dapan1121 已提交
240
        schedulerFreeJob(pRequest->body.pQueryJob);
241 242 243
      }
    }

H
Haojun Liao 已提交
244 245 246
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    pRequest->code = res.code;
    return pRequest->code;
X
Xiaoyu Wang 已提交
247
  }
248

249
  return schedulerAsyncExecJob(pTransporter, pNodeList, pDag, pRequest->sqlstr, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
250
}
X
Xiaoyu Wang 已提交
251

X
Xiaoyu Wang 已提交
252 253
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
254 255
  if (sqlLen > (size_t) TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
X
Xiaoyu Wang 已提交
256 257
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
258 259
  }

X
Xiaoyu Wang 已提交
260 261
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
262
  SRequestObj *pRequest = NULL;
263
  SQueryNode  *pQueryNode = NULL;
264
  SArray      *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
265

X
Xiaoyu Wang 已提交
266
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
267
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
268
  CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
H
Haojun Liao 已提交
269

270 271
  if (qIsDdlQuery(pQueryNode)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQueryNode), _return);
H
Haojun Liao 已提交
272
  } else {
273 274
    CHECK_CODE_GOTO(getPlan(pRequest, pQueryNode, &pRequest->body.pDag, pNodeList), _return);
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
275
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
276 277 278
  }

_return:
279
  taosArrayDestroy(pNodeList);
280
  qDestroyQuery(pQueryNode);
X
Xiaoyu Wang 已提交
281
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
282 283
    pRequest->code = terrno;
  }
284

285 286 287
  return pRequest;
}

288
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
289 290
  pEpSet->version = 0;

H
Haojun Liao 已提交
291
  // init mnode ip set
292 293 294 295 296 297 298 299 300 301
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

H
Haojun Liao 已提交
302
    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
303 304 305 306 307 308 309 310 311
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

H
Haojun Liao 已提交
312
    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
313 314 315 316 317 318 319 320 321 322 323
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

H
Haojun Liao 已提交
324
STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
325
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
326 327 328 329 330
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
331
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
332 333 334
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
335 336 337
    return NULL;
  }

338
  SMsgSendInfo* body = buildConnectMsg(pRequest);
339 340

  int64_t transporterId = 0;
H
Haojun Liao 已提交
341
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
342 343 344

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
S
Shengliang Guan 已提交
345
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
346 347 348 349 350 351
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
H
Haojun Liao 已提交
352
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
353 354 355 356 357 358
    destroyRequest(pRequest);
  }

  return pTscObj;
}

359 360 361 362 363 364 365
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
366
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
367
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
368 369
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
370
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
371
  pMsgSendInfo->param           = pRequest;
372

S
Shengliang Guan 已提交
373
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
374
  if (pConnect == NULL) {
375
    tfree(pMsgSendInfo);
376
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
377
    return NULL;
378 379
  }

380 381
  STscObj *pObj = pRequest->pTscObj;

H
Haojun Liao 已提交
382
  char* db = getDbOfConnection(pObj);
383 384 385
  if (db != NULL) {
    tstrncpy(pConnect->db, db, sizeof(pConnect->db));
  }
386
  tfree(db);
387

388 389 390
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
391

392 393
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
394 395
}

396
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
397
  assert(pMsgBody != NULL);
398 399
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
400 401
}

402
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
403 404
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
405

406
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
407
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
408
    assert(pRequest->self == pSendInfo->requestObjRefId);
409

410 411
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
412

413 414 415 416 417
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
418 419
    }

420
    /*
421 422
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
423
     */
424
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
425
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
426
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
427
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
428
    } else {
H
Haojun Liao 已提交
429
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
430
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
431
    }
432

H
Haojun Liao 已提交
433
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
434 435
  }

436 437 438 439 440 441 442 443 444 445
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
446 447
  }

448
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
449
  rpcFreeCont(pMsg->pCont);
450
  destroySendMsgInfo(pSendInfo);
451
}
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

dengyihao's avatar
dengyihao 已提交
473 474 475 476
  strncpy(ipStr,   ip,   TMIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
477
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
478 479 480 481
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
482
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
483

H
Haojun Liao 已提交
484 485
  SEpSet epSet = {0};

H
Haojun Liao 已提交
486
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
487
    if (pRequest->type == TDMT_VND_QUERY) {
488 489 490 491 492
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

493
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
494
      int32_t code = schedulerFetchRows(pRequest->body.pQueryJob, (void **)&pResInfo->pData);
495 496 497 498
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
499

500 501 502 503
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
      tscDebug("0x%"PRIx64 " fetch results, numOfRows:%d total Rows:%"PRId64", complete:%d, reqId:0x%"PRIx64, pRequest->self, pResInfo->numOfRows,
               pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

504
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
505 506 507 508
        return NULL;
      }

      goto _return;
509
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
510
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
511
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
512 513
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
514 515 516
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);

H
Haojun Liao 已提交
517
      epSet = pVgroupInfo->epset;
H
Haojun Liao 已提交
518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
534
      epSet = pVgroupInfo->epset;
H
Haojun Liao 已提交
535

H
Haojun Liao 已提交
536 537
      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
538
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
539 540 541
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
542 543
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
      return NULL;
H
Haojun Liao 已提交
544
    }
H
Haojun Liao 已提交
545

546
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
547

548 549
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
550
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
551 552 553 554 555 556 557 558 559

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
560 561
_return:

H
Haojun Liao 已提交
562 563 564 565 566
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
567
    }
H
Haojun Liao 已提交
568 569 570 571 572 573
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
574 575 576 577 578 579 580 581
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

582
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
583 584 585 586 587
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
588 589 590
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
591 592 593
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
594
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
595 596
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
597
  }
S
Shengliang Guan 已提交
598 599
}

H
Haojun Liao 已提交
600
char* getDbOfConnection(STscObj* pObj) {
601 602
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
603 604 605 606
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
607

608
  pthread_mutex_unlock(&pObj->mutex);
609 610 611 612 613 614 615 616 617
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
618

619
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
620 621 622
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
623 624
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
625 626
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
627

628
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
629
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
630
}