clientImpl.c 21.0 KB
Newer Older
1

2
#include "../../libs/scheduler/inc/schedulerInt.h"
3
#include "clientInt.h"
4
#include "clientLog.h"
5 6 7
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
8 9
#include "tdef.h"
#include "tep.h"
10
#include "tglobal.h"
11
#include "tmsgtype.h"
12 13
#include "tnote.h"
#include "tpagedfile.h"
14
#include "tref.h"
X
Xiaoyu Wang 已提交
15 16

#define CHECK_CODE_GOTO(expr, lable) \
H
Haojun Liao 已提交
17 18
  do {                               \
    int32_t code = expr;             \
X
Xiaoyu Wang 已提交
19
    if (TSDB_CODE_SUCCESS != code) { \
H
Haojun Liao 已提交
20 21 22
      terrno = code;                 \
      goto lable;                    \
    }                                \
X
Xiaoyu Wang 已提交
23
  } while (0)
24

25
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
26 27
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
28
static void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
29

H
Haojun Liao 已提交
30
  static bool stringLengthCheck(const char* str, size_t maxsize) {
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

static bool validateUserName(const char* user) {
  return stringLengthCheck(user, TSDB_USER_LEN - 1);
}

static bool validatePassword(const char* passwd) {
48
  return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1);
49 50 51 52 53 54
}

static bool validateDbName(const char* db) {
  return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1);
}

55 56 57 58 59 60 61
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo);
62
static void  setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema);
63 64

TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) {
65 66 67 68
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

  char tmp[TSDB_DB_NAME_LEN] = {0};
  if (db != NULL) {
    if(!validateDbName(db)) {
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

    tstrncpy(tmp, db, sizeof(tmp));
    strdequote(tmp);
  }

85
  char secretEncrypt[32] = {0};
86 87 88 89 90 91
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

92
    taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
93 94 95 96
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

97
  SCorEpSet epSet = {0};
98 99 100 101 102 103 104 105 106 107 108 109 110 111
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.port[0] = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }

112 113
  char* key = getClusterKey(user, secretEncrypt, ip, port);

H
Haojun Liao 已提交
114
  SAppInstInfo** pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
115
  if (pInst == NULL) {
H
Haojun Liao 已提交
116 117
    SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
    p->mgmtEp       = epSet;
H
Haojun Liao 已提交
118
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
H
Haojun Liao 已提交
119
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
120

H
Haojun Liao 已提交
121
    pInst = &p;
122 123
  }

H
Haojun Liao 已提交
124 125
  tfree(key);
  return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
126 127
}

X
Xiaoyu Wang 已提交
128 129 130
int32_t buildRequest(STscObj *pTscObj, const char *sql, int sqlLen, SRequestObj** pRequest) {
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
131
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
132
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
133 134
  }

X
Xiaoyu Wang 已提交
135 136 137 138 139
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
    tscError("0x%"PRIx64" failed to prepare sql string buffer", (*pRequest)->self);
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
140 141
  }

X
Xiaoyu Wang 已提交
142 143 144
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
145

H
Haojun Liao 已提交
146
  tscDebugL("0x%"PRIx64" SQL: %s, reqId:0x%"PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
147 148
  return TSDB_CODE_SUCCESS;
}
149

X
Xiaoyu Wang 已提交
150
int32_t parseSql(SRequestObj* pRequest, SQueryNode** pQuery) {
H
Haojun Liao 已提交
151 152
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
153
  SParseContext cxt = {
H
Haojun Liao 已提交
154
    .ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj), .pTransporter = pTscObj->pTransporter},
155
    .pSql   = pRequest->sqlstr,
X
Xiaoyu Wang 已提交
156
    .sqlLen = pRequest->sqlLen,
157
    .pMsg   = pRequest->msgBuf,
X
Xiaoyu Wang 已提交
158 159
    .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
  };
H
Haojun Liao 已提交
160

H
Haojun Liao 已提交
161
  cxt.ctx.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
162
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.ctx.pCatalog);
H
Haojun Liao 已提交
163 164 165 166 167 168
  if (code != TSDB_CODE_SUCCESS) {
    tfree(cxt.ctx.db);
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
169

X
Xiaoyu Wang 已提交
170 171 172
  tfree(cxt.ctx.db);
  return code;
}
H
Haojun Liao 已提交
173

X
Xiaoyu Wang 已提交
174 175 176
int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
  SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
  pRequest->type = pDcl->msgType;
H
Haojun Liao 已提交
177
  pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
X
Xiaoyu Wang 已提交
178

H
Haojun Liao 已提交
179
  STscObj* pTscObj = pRequest->pTscObj;
180
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
X
Xiaoyu Wang 已提交
181

182
  int64_t transporterId = 0;
183 184 185 186
  if (pDcl->msgType == TDMT_VND_CREATE_TABLE || pDcl->msgType == TDMT_VND_SHOW_TABLES) {
    if (pDcl->msgType == TDMT_VND_SHOW_TABLES) {
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      if (pShowReqInfo->pArray == NULL) {
H
Haojun Liao 已提交
187
        pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
188 189 190
        pShowReqInfo->pArray = pDcl->pExtension;
      }
    }
191
    asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
192
  } else {
193
    SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
194
    asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
195 196
  }

X
Xiaoyu Wang 已提交
197 198 199
  tsem_wait(&pRequest->body.rspSem);
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
200

201 202
int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) {
  pRequest->type = pQueryNode->type;
203 204

  SReqResultInfo* pResInfo = &pRequest->body.resInfo;
205
  int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId);
206 207 208 209 210
  if (code != 0) {
    return code;
  }

  if (pQueryNode->type == TSDB_SQL_SELECT) {
211 212 213 214 215
    SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0);

    SSubplan* pPlan = taosArrayGetP(pa, 0);
    SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema);
    setResSchemaInfo(pResInfo, pDataBlockSchema);
H
Haojun Liao 已提交
216

217
    pRequest->type = TDMT_VND_QUERY;
218 219 220
  }

  return code;
X
Xiaoyu Wang 已提交
221 222
}

223 224 225 226 227 228 229 230 231 232
void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) {
  assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0);

  pResInfo->numOfCols = pDataBlockSchema->numOfCols;
  pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0]));

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
    SSchema* pSchema = &pDataBlockSchema->pSchema[i];
    pResInfo->fields[i].bytes = pSchema->bytes;
    pResInfo->fields[i].type  = pSchema->type;
233
    tstrncpy(pResInfo->fields[i].name, pSchema->name, tListLen(pResInfo->fields[i].name));
234 235 236 237
  }
}

int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
238
  if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) {
X
Xiaoyu Wang 已提交
239
    SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
240

241
    int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res);
242 243 244
    if (code != TSDB_CODE_SUCCESS) {
      // handle error and retry
    } else {
245 246
      if (pRequest->body.pQueryJob != NULL) {
        scheduleFreeJob(pRequest->body.pQueryJob);
247 248 249
      }
    }

X
Xiaoyu Wang 已提交
250 251
    pRequest->affectedRows = res.numOfRows;
    return res.code;
X
Xiaoyu Wang 已提交
252
  }
253

254
  return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
X
Xiaoyu Wang 已提交
255
}
X
Xiaoyu Wang 已提交
256

L
Liu Jicong 已提交
257 258 259 260 261 262 263 264 265 266
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  SRequestObj* pRequest = NULL;
  SQueryNode*  pQuery = NULL;
  SQueryDag*   pDag = NULL;
  char *dagStr = NULL;

  terrno = TSDB_CODE_SUCCESS;

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
L
Liu Jicong 已提交
267 268 269

//temporary disabled until planner ready
#if 0
L
Liu Jicong 已提交
270 271 272 273 274 275 276 277 278
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
  //TODO: check sql valid

  CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return);

  dagStr = qDagToString(pDag);
  if(dagStr == NULL) {
    //TODO
  }
L
Liu Jicong 已提交
279
#endif
L
Liu Jicong 已提交
280 281 282 283

  SCMCreateTopicReq req = {
    .name = (char*)name,
    .igExists = 0,
L
Liu Jicong 已提交
284 285 286
    /*.physicalPlan = dagStr,*/
    .physicalPlan = (char*)sql,
    .logicalPlan = "",
L
Liu Jicong 已提交
287 288
  };

L
Liu Jicong 已提交
289 290 291 292 293 294 295 296
  int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
  void* buf = malloc(tlen);
  if(buf == NULL) {
    goto _return;
  }
  void* abuf = buf;
  tSerializeSCMCreateTopicReq(&abuf, &req);
  /*printf("formatted: %s\n", dagStr);*/
L
Liu Jicong 已提交
297 298 299

  pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };

300
  SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
  SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;

  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body);

  tsem_wait(&pRequest->body.rspSem);

_return:
  qDestroyQuery(pQuery);
  qDestroyQueryDag(pDag); 
  destroySendMsgInfo(body);
  if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
    pRequest->code = terrno;
  }
  return pRequest;
}

X
Xiaoyu Wang 已提交
318 319 320 321 322 323
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
  STscObj *pTscObj = (STscObj *)taos;
  if (sqlLen > (size_t) tsMaxSQLStringLen) {
    tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
324 325
  }

X
Xiaoyu Wang 已提交
326 327
  nPrintTsc("%s", sql)

H
Haojun Liao 已提交
328 329 330
  SRequestObj *pRequest = NULL;
  SQueryNode  *pQuery   = NULL;
  SQueryDag   *pDag     = NULL;
X
Xiaoyu Wang 已提交
331

X
Xiaoyu Wang 已提交
332
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
333 334
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
H
Haojun Liao 已提交
335

X
Xiaoyu Wang 已提交
336 337
  if (qIsDdlQuery(pQuery)) {
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
338
  } else {
X
Xiaoyu Wang 已提交
339
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return);
340
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return);
X
Xiaoyu Wang 已提交
341
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
342 343 344
  }

_return:
L
Liu Jicong 已提交
345
  qDestroyQuery(pQuery);
X
Xiaoyu Wang 已提交
346
  qDestroyQueryDag(pDag);
X
Xiaoyu Wang 已提交
347
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
348 349
    pRequest->code = terrno;
  }
350

351 352 353
  return pRequest;
}

354
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
355 356
  pEpSet->version = 0;

H
Haojun Liao 已提交
357
  // init mnode ip set
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389
  SEpSet *mgmtEpSet   = &(pEpSet->epSet);
  mgmtEpSet->numOfEps = 0;
  mgmtEpSet->inUse    = 0;

  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(firstEp, mgmtEpSet->fqdn[0], &(mgmtEpSet->port[0]));
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
    }

    taosGetFqdnPortFromEp(secondEp, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
    mgmtEpSet->numOfEps++;
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

390
STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo) {
391
  STscObj *pTscObj = createTscObj(user, auth, db, pAppInfo);
392 393 394 395 396
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

H
Hongze Cheng 已提交
397
  SRequestObj *pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
398 399 400
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
401 402 403
    return NULL;
  }

404
  SMsgSendInfo* body = buildConnectMsg(pRequest);
405 406

  int64_t transporterId = 0;
H
Haojun Liao 已提交
407
  asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
408 409 410 411 412 413 414 415 416 417

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
    const char *errorMsg = (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(terrno);
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
418
    tscDebug("0x%"PRIx64" connection is opening, connId:%d, dnodeConn:%p, reqId:0x%"PRIx64, pTscObj->id, pTscObj->connId, pTscObj->pTransporter, pRequest->requestId);
419 420 421 422 423 424
    destroyRequest(pRequest);
  }

  return pTscObj;
}

425 426 427 428 429 430 431
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
  SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

H
Haojun Liao 已提交
432
  pMsgSendInfo->msgType         = TDMT_MND_CONNECT;
S
Shengliang Guan 已提交
433
  pMsgSendInfo->msgInfo.len     = sizeof(SConnectReq);
434 435
  pMsgSendInfo->requestObjRefId = pRequest->self;
  pMsgSendInfo->requestId       = pRequest->requestId;
D
catalog  
dapan1121 已提交
436
  pMsgSendInfo->fp              = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
437
  pMsgSendInfo->param           = pRequest;
438

S
Shengliang Guan 已提交
439
  SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
440
  if (pConnect == NULL) {
441
    tfree(pMsgSendInfo);
442
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
443
    return NULL;
444 445
  }

446 447
  STscObj *pObj = pRequest->pTscObj;

448
  char* db = getConnectionDB(pObj);
449
  tstrncpy(pConnect->db, db, sizeof(pConnect->db));
450
  tfree(db);
451

452 453 454
  pConnect->pid = htonl(appInfo.pid);
  pConnect->startTime = htobe64(appInfo.startTime);
  tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
455

456 457
  pMsgSendInfo->msgInfo.pData = pConnect;
  return pMsgSendInfo;
458 459
}

460
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
461
  assert(pMsgBody != NULL);
462 463
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
464 465
}

466
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
467 468
  SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
  assert(pMsg->ahandle != NULL);
469

470
  if (pSendInfo->requestObjRefId != 0) {
H
Haojun Liao 已提交
471
    SRequestObj *pRequest = (SRequestObj *)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
472
    assert(pRequest->self == pSendInfo->requestObjRefId);
473

474 475
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
476

477 478 479 480 481
    STscObj *pTscObj = pRequest->pTscObj;
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
482 483
    }

484
    /*
485 486
   * There is not response callback function for submit response.
   * The actual inserted number of points is the first number.
487
     */
488
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
489
    if (pMsg->code == TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
490
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%"PRIx64, pRequest->self,
491
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
492
    } else {
H
Haojun Liao 已提交
493
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%"PRIx64, pRequest->self,
494
          TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
495
    }
496

H
Haojun Liao 已提交
497
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
498 499
  }

500 501 502 503 504 505 506 507 508 509
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
510 511
  }

512
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
513
  rpcFreeCont(pMsg->pCont);
514
  destroySendMsgInfo(pSendInfo);
515
}
516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541

TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port) {
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN]      = {0};
  char dbStr[TSDB_DB_NAME_LEN] = {0};
  char userStr[TSDB_USER_LEN]  = {0};
  char passStr[TSDB_PASSWORD_LEN]   = {0};

  strncpy(ipStr,   ip,   MIN(TSDB_EP_LEN - 1, ipLen));
  strncpy(userStr, user, MIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, MIN(TSDB_PASSWORD_LEN - 1, passLen));
  strncpy(dbStr,   db,   MIN(TSDB_DB_NAME_LEN - 1, dbLen));
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
542 543 544 545
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
546
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
547

H
Haojun Liao 已提交
548
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
549
    if (pRequest->type == TDMT_VND_QUERY) {
550 551 552 553 554
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

D
dapan1121 已提交
555
      scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
H
Haojun Liao 已提交
556 557
      setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);

558
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
559 560 561 562
        return NULL;
      }

      goto _return;
563
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
564 565 566
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

      SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);

      int64_t  transporterId = 0;
      STscObj *pTscObj = pRequest->pTscObj;
      asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
590
    }
H
Haojun Liao 已提交
591

592
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
593

594 595
    int64_t  transporterId = 0;
    STscObj *pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
596
    asyncSendMsgToServer(pTscObj->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
H
Haojun Liao 已提交
597 598 599 600 601 602 603 604 605

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
606 607
_return:

H
Haojun Liao 已提交
608 609 610 611 612
  for(int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
613
    }
H
Haojun Liao 已提交
614 615 616 617 618 619
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
620 621 622 623 624 625 626 627
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
    pResInfo->row    = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

628
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
629 630 631 632 633
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
634 635 636
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
637 638 639
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
H
Haojun Liao 已提交
640
    pResultInfo->row[i]    = (char*) (pResultInfo->pData + offset * pResultInfo->numOfRows);
H
Haojun Liao 已提交
641 642
    pResultInfo->pCol[i]   = pResultInfo->row[i];
    offset += pResultInfo->fields[i].bytes;
643
  }
S
Shengliang Guan 已提交
644 645
}

646 647 648 649 650
char* getConnectionDB(STscObj* pObj) {
  char *p = NULL;
  pthread_mutex_lock(&pObj->mutex);
  p = strndup(pObj->db, tListLen(pObj->db));
  pthread_mutex_unlock(&pObj->mutex);
S
Shengliang Guan 已提交
651

652 653 654 655 656 657 658 659 660
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
661

662 663 664 665
void setQueryResultByRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
  assert(pResultInfo != NULL && pRsp != NULL);

  pResultInfo->pRspMsg   = (const char*) pRsp;
H
Haojun Liao 已提交
666 667
  pResultInfo->pData     = (void*) pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
668 669
  pResultInfo->current   = 0;
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
670 671 672

  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
}