clientImpl.c 26.9 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4
#include "command.h"
5
#include "scheduler.h"
H
Haojun Liao 已提交
6
#include "tdatablock.h"
7
#include "tdef.h"
8
#include "tglobal.h"
9
#include "tmsgtype.h"
H
Haojun Liao 已提交
10
#include "tpagedbuf.h"
11
#include "tref.h"
X
Xiaoyu Wang 已提交
12

S
Shengliang Guan 已提交
13
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
dengyihao's avatar
dengyihao 已提交
14 15
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
16

17
static bool stringLengthCheck(const char* str, size_t maxsize) {
18 19 20 21 22 23 24 25 26 27 28 29
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
30
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
31

dengyihao's avatar
dengyihao 已提交
32
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
33

dengyihao's avatar
dengyihao 已提交
34
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
35

36 37 38 39 40 41
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
42 43 44
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                                SAppInstInfo* pAppInfo);
static void     setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
45

dengyihao's avatar
dengyihao 已提交
46 47
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
                            uint16_t port) {
48 49 50 51
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

52 53 54 55 56
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

57
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
58
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
59
    if (!validateDbName(db)) {
60 61 62 63
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

64 65
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
66 67
  }

68
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
69 70 71 72 73 74
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
75
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
76 77 78 79
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

80
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.eps[0].port = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
94

dengyihao's avatar
dengyihao 已提交
95
  char*          key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
96
  SAppInstInfo** pInst = NULL;
97

wafwerar's avatar
wafwerar 已提交
98
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
99 100

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
101
  SAppInstInfo* p = NULL;
102
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
103
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
104
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
105
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
106
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
107
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
108

H
Haojun Liao 已提交
109
    pInst = &p;
110 111
  }

wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
113

wafwerar's avatar
wafwerar 已提交
114
  taosMemoryFreeClear(key);
H
Haojun Liao 已提交
115
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
116 117
}

dengyihao's avatar
dengyihao 已提交
118
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
119 120
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
121
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
122
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
123 124
  }

wafwerar's avatar
wafwerar 已提交
125
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
126
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
127
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
128 129
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
130 131
  }

X
Xiaoyu Wang 已提交
132 133 134
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
135

dengyihao's avatar
dengyihao 已提交
136
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
137 138
  return TSDB_CODE_SUCCESS;
}
139

X
Xiaoyu Wang 已提交
140
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
H
Haojun Liao 已提交
141 142
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
143
  SParseContext cxt = {
dengyihao's avatar
dengyihao 已提交
144 145
      .requestId = pRequest->requestId,
      .acctId = pTscObj->acctId,
X
Xiaoyu Wang 已提交
146
      .db = pRequest->pDb,
X
Xiaoyu Wang 已提交
147
      .topicQuery = topicQuery,
dengyihao's avatar
dengyihao 已提交
148 149 150 151 152
      .pSql = pRequest->sqlstr,
      .sqlLen = pRequest->sqlLen,
      .pMsg = pRequest->msgBuf,
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
      .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
153
  };
H
Haojun Liao 已提交
154

H
Haojun Liao 已提交
155 156
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
157 158 159 160 161
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
162 163 164 165
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
    }
H
Haojun Liao 已提交
166

X
Xiaoyu Wang 已提交
167 168
    TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
X
Xiaoyu Wang 已提交
169
  }
170

X
Xiaoyu Wang 已提交
171 172
  return code;
}
H
Haojun Liao 已提交
173

174 175 176 177
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
  int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
178
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
179 180 181 182
  }
  return code;
}

X
Xiaoyu Wang 已提交
183
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
184 185 186 187 188
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

189 190 191
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
X
Xiaoyu Wang 已提交
192
  pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
193 194 195

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
196 197 198 199 200 201 202 203

  if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
    SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
    if (pShowReqInfo->pArray == NULL) {
      pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
      pShowReqInfo->pArray = pMsgInfo->pExtension;
    }
  }
204 205
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
206

207
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
208 209
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
210

211 212
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
X
Xiaoyu Wang 已提交
213 214 215 216
  SPlanContext cxt = {
    .queryId = pRequest->requestId,
    .acctId = pRequest->pTscObj->acctId,
    .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
D
dapan1121 已提交
217 218
    .pAstRoot = pQuery->pRoot,
    .showRewrite = pQuery->showRewrite
X
Xiaoyu Wang 已提交
219
  };
220
  int32_t  code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
221 222 223 224
  if (code != 0) {
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
225 226
}

227 228
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
229

230
  pResInfo->numOfCols = numOfCols;
231 232
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
233 234

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
235
    pResInfo->fields[i].bytes = pSchema[i].bytes;
236 237 238 239 240 241 242 243 244 245 246
    pResInfo->fields[i].type  = pSchema[i].type;

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
    pResInfo->userFields[i].type  = pSchema[i].type;

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

247
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
248
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
249
  }
X
Xiaoyu Wang 已提交
250 251
}

X
Xiaoyu Wang 已提交
252
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
253
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
254

D
dapan1121 已提交
255
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
D
dapan1121 已提交
256
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res);
D
dapan1121 已提交
257
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
258 259
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
260 261
    }

D
dapan1121 已提交
262
    pRequest->code = code;
D
dapan1121 已提交
263
    terrno = code;
H
Haojun Liao 已提交
264
    return pRequest->code;
X
Xiaoyu Wang 已提交
265
  }
266

267
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
268
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
269

D
dapan1121 已提交
270 271
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
272 273
    }
  }
D
dapan1121 已提交
274

D
dapan1121 已提交
275
  pRequest->code = res.code;
D
dapan1121 已提交
276
  terrno = res.code;  
D
dapan1121 已提交
277
  return pRequest->code;
X
Xiaoyu Wang 已提交
278
}
X
Xiaoyu Wang 已提交
279

D
dapan1121 已提交
280
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
S
Shengliang Guan 已提交
281
  SRequestObj* pRequest = NULL;
X
Xiaoyu Wang 已提交
282
  SQuery* pQuery = NULL;
X
Xiaoyu Wang 已提交
283
  SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
284

X
Xiaoyu Wang 已提交
285 286 287 288
  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
  if (TSDB_CODE_SUCCESS == code) {
    code = parseSql(pRequest, false, &pQuery);
  }
H
Haojun Liao 已提交
289

X
Xiaoyu Wang 已提交
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
  if (TSDB_CODE_SUCCESS == code) {
    switch (pQuery->execMode) {
      case QUERY_EXEC_MODE_LOCAL:
        code = execLocalCmd(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_RPC:
        code = execDdlQuery(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_SCHEDULE:
        code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
        if (TSDB_CODE_SUCCESS == code) {
          code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
        }
        break;
      case QUERY_EXEC_MODE_EMPTY_RESULT:
        pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
310 311
  }

312
  taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
313
  qDestroyQuery(pQuery);
D
dapan1121 已提交
314
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
315 316
    pRequest->code = terrno;
  }
317

318 319 320
  return pRequest;
}

D
dapan1121 已提交
321 322 323 324 325 326 327 328 329
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
  SCatalog *pCatalog = NULL;
  int32_t code = 0;
  int32_t dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t tblNum = taosArrayGetSize(pRequest->tableList);

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
D
dapan1121 已提交
330
  
D
dapan1121 已提交
331 332 333 334 335 336 337 338 339 340 341 342 343
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
    char *dbFName = taosArrayGet(pRequest->dbList, i);
    
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
344 345 346
    }
  }

D
dapan1121 已提交
347 348 349 350 351 352 353
  for (int32_t i = 0; i < tblNum; ++i) {
    SName *tableName = taosArrayGet(pRequest->tableList, i);

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
354 355
  }

D
dapan1121 已提交
356
  return code;
D
dapan1121 已提交
357 358 359 360 361
}


SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
D
dapan1121 已提交
362
  int32_t retryNum = 0;
D
dapan1121 已提交
363 364
  int32_t code = 0;

D
dapan1121 已提交
365
  while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
D
dapan1121 已提交
366
    pRequest = execQueryImpl(pTscObj, sql, sqlLen);
D
dapan1121 已提交
367
    if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
368 369 370
      break;
    }

D
dapan1121 已提交
371 372 373
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
374 375
      break;
    }
D
dapan1121 已提交
376 377

    destroyRequest(pRequest);
D
dapan1121 已提交
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393
  }
  
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
394 395
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
396

H
Haojun Liao 已提交
397
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
398
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
399
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
400
  mgmtEpSet->inUse = 0;
401

S
Shengliang Guan 已提交
402 403 404 405
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
406
    }
S
Shengliang Guan 已提交
407 408 409 410 411 412 413 414 415

    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
416
    }
S
Shengliang Guan 已提交
417 418 419

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
420 421 422 423 424 425 426 427 428 429
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
430 431 432
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                         SAppInstInfo* pAppInfo) {
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
433 434 435 436 437
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
438
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
439 440 441
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
442 443 444
    return NULL;
  }

445
  SMsgSendInfo* body = buildConnectMsg(pRequest);
446 447

  int64_t transporterId = 0;
H
Haojun Liao 已提交
448
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
449 450 451

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
452 453
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
454 455 456 457 458 459
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
dengyihao's avatar
dengyihao 已提交
460 461
    tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
462 463 464 465 466 467
    destroyRequest(pRequest);
  }

  return pTscObj;
}

dengyihao's avatar
dengyihao 已提交
468
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
469
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
470 471 472 473 474
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
475
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
476

477
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
478 479 480
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
481

S
Shengliang Guan 已提交
482 483
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
484

H
Haojun Liao 已提交
485
  char* db = getDbOfConnection(pObj);
486
  if (db != NULL) {
S
Shengliang Guan 已提交
487
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
488
  }
wafwerar's avatar
wafwerar 已提交
489
  taosMemoryFreeClear(db);
490

S
Shengliang Guan 已提交
491 492 493 494 495
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
496
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
497
  tSerializeSConnectReq(pReq, contLen, &connectReq);
498

S
Shengliang Guan 已提交
499 500
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
501
  return pMsgSendInfo;
502 503
}

504
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
505
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
506 507
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
508
}
509

dengyihao's avatar
dengyihao 已提交
510
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
D
dapan1121 已提交
511
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
512
}
513

514
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
515
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
516
  assert(pMsg->ahandle != NULL);
517

518
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
519
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
520
    assert(pRequest->self == pSendInfo->requestObjRefId);
521

D
dapan1121 已提交
522
    pRequest->metric.rsp = taosGetTimestampUs();
523

dengyihao's avatar
dengyihao 已提交
524
    STscObj* pTscObj = pRequest->pTscObj;
525 526 527 528
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
529 530
    }

531
    /*
dengyihao's avatar
dengyihao 已提交
532 533
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
534
     */
535
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
536
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
537
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
D
dapan1121 已提交
538
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
539
    } else {
dengyihao's avatar
dengyihao 已提交
540
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
D
dapan1121 已提交
541
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
542
    }
543

H
Haojun Liao 已提交
544
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
545 546
  }

dengyihao's avatar
dengyihao 已提交
547
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
548 549

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
550
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
551 552 553 554 555 556
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
557 558
  }

559
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
560
  rpcFreeCont(pMsg->pCont);
561
  destroySendMsgInfo(pSendInfo);
562
}
563

dengyihao's avatar
dengyihao 已提交
564
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
565 566 567 568 569 570 571 572 573 574 575 576 577
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

dengyihao's avatar
dengyihao 已提交
578 579 580
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
581
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
582 583
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
584

dengyihao's avatar
dengyihao 已提交
585
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
586 587
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
588
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
589
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
590 591
}

L
Liu Jicong 已提交
592
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
      if (pCol->offset[pResultInfo->current] != -1) {
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
      } else {
        pResultInfo->row[i] = NULL;
      }
    }
  }
}

618
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
619
  assert(pRequest != NULL);
620
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
621

H
Haojun Liao 已提交
622 623
  SEpSet epSet = {0};

H
Haojun Liao 已提交
624
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
625
    if (pRequest->type == TDMT_VND_QUERY) {
626 627
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
628
        pResultInfo->numOfRows = 0;
629 630 631
        return NULL;
      }

632
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
633 634
      pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
      if (pRequest->code != TSDB_CODE_SUCCESS) {
635
        pResultInfo->numOfRows = 0;
636 637 638
        return NULL;
      }

639
      pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
640
      if (pRequest->code != TSDB_CODE_SUCCESS) {
641
        pResultInfo->numOfRows = 0;
642 643
        return NULL;
      }
H
Haojun Liao 已提交
644

dengyihao's avatar
dengyihao 已提交
645 646
      tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
               pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
647

648
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
649 650 651 652
        return NULL;
      }

      goto _return;
653
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
654
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
655
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
656 657
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
658
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
dengyihao's avatar
dengyihao 已提交
659
      SVgroupInfo*  pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
660

L
Liu Jicong 已提交
661
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
662 663 664 665 666 667 668 669
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

dengyihao's avatar
dengyihao 已提交
670
      SVgroupInfo*     pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
wafwerar's avatar
wafwerar 已提交
671
      SVShowTablesReq* pShowReq = taosMemoryCalloc(1, sizeof(SVShowTablesReq));
H
Haojun Liao 已提交
672 673 674 675 676 677
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
678
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
679

H
Haojun Liao 已提交
680
      int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
681
      STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
682
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
683 684 685
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
dengyihao's avatar
dengyihao 已提交
686
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
D
dapan1121 已提交
687
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
dengyihao's avatar
dengyihao 已提交
688

D
dapan1121 已提交
689 690 691
      if (pResultInfo->completed) {
        return NULL;
      }
H
Haojun Liao 已提交
692
    }
H
Haojun Liao 已提交
693

694 695 696 697 698
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

699
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
700

701
    int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
702
    STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
703
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
704 705 706 707 708 709 710 711 712

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
713
_return:
714 715 716
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
717 718 719 720 721
  }

  return pResultInfo->row;
}

722
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
723
  if (pResInfo->row == NULL) {
wafwerar's avatar
wafwerar 已提交
724 725 726 727
    pResInfo->row    = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
728

729 730 731
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
732
  }
733 734

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
735 736
}

737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (type == TSDB_DATA_TYPE_NCHAR) {
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);

          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
  }

  return TSDB_CODE_SUCCESS;
}

int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) {
H
Haojun Liao 已提交
773 774
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
775
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
776 777
  }

778 779 780 781
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
782

783 784
  int32_t* colLength = (int32_t*)pResultInfo->pData;
  char*    pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
H
Haojun Liao 已提交
785
  for (int32_t i = 0; i < numOfCols; ++i) {
786 787 788 789 790 791 792 793 794 795 796
    colLength[i] = htonl(colLength[i]);

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
797
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
798 799 800
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
801
  }
802

H
Haojun Liao 已提交
803
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
804 805
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
806 807
  }

808
  return code;
S
Shengliang Guan 已提交
809 810
}

H
Haojun Liao 已提交
811
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
812
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
813
  taosThreadMutexLock(&pObj->mutex);
814 815 816 817
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
818

wafwerar's avatar
wafwerar 已提交
819
  taosThreadMutexUnlock(&pObj->mutex);
820 821 822 823 824
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
825
  taosThreadMutexLock(&pTscObj->mutex);
826
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
827
  taosThreadMutexUnlock(&pTscObj->mutex);
828
}
S
Shengliang Guan 已提交
829

H
Haojun Liao 已提交
830 831 832 833 834 835 836 837 838 839
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

840
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
841 842
  assert(pResultInfo != NULL && pRsp != NULL);

843 844 845 846 847 848
  pResultInfo->pRspMsg    = (const char*)pRsp;
  pResultInfo->pData      = (void*)pRsp->data;
  pResultInfo->numOfRows  = htonl(pRsp->numOfRows);
  pResultInfo->current    = 0;
  pResultInfo->completed  = (pRsp->completed == 1);
  pResultInfo->payloadLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
849
  pResultInfo->precision  = pRsp->precision;
H
Haojun Liao 已提交
850

851
  // TODO handle the compressed case
852
  pResultInfo->totalRows += pResultInfo->numOfRows;
853
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
L
fix  
Liu Jicong 已提交
854
}