clientImpl.c 22.5 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5 6
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
H
Haojun Liao 已提交
7
#include "tdatablock.h"
8
#include "tdef.h"
9
#include "tglobal.h"
10
#include "tmsgtype.h"
H
Haojun Liao 已提交
11
#include "tpagedbuf.h"
12
#include "tref.h"
X
Xiaoyu Wang 已提交
13

S
Shengliang Guan 已提交
14
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
dengyihao's avatar
dengyihao 已提交
15 16 17
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void          setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
18

19
static bool stringLengthCheck(const char* str, size_t maxsize) {
20 21 22 23 24 25 26 27 28 29 30 31
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
32
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
33

dengyihao's avatar
dengyihao 已提交
34
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
35

dengyihao's avatar
dengyihao 已提交
36
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
37

38 39 40 41 42 43
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
44 45 46
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                                SAppInstInfo* pAppInfo);
static void     setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
47

dengyihao's avatar
dengyihao 已提交
48 49
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
                            uint16_t port) {
50 51 52 53
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

54 55 56 57 58
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

59
  char localDb[TSDB_DB_NAME_LEN] = {0};
60
  if (db != NULL) {
dengyihao's avatar
dengyihao 已提交
61
    if (!validateDbName(db)) {
62 63 64 65
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

66 67
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
68 69
  }

70
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
71 72 73 74 75 76
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
77
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
78 79 80 81
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

82
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.eps[0].port = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
96

dengyihao's avatar
dengyihao 已提交
97
  char*          key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
98
  SAppInstInfo** pInst = NULL;
99

H
Haojun Liao 已提交
100 101 102
  pthread_mutex_lock(&appInfo.mutex);

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
103
  SAppInstInfo* p = NULL;
104
  if (pInst == NULL) {
L
Liu Jicong 已提交
105
    p = calloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
106
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
107
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
108
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
109
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
110

H
Haojun Liao 已提交
111
    pInst = &p;
112 113
  }

H
Haojun Liao 已提交
114 115
  pthread_mutex_unlock(&appInfo.mutex);

H
Haojun Liao 已提交
116
  tfree(key);
H
Haojun Liao 已提交
117
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
118 119
}

dengyihao's avatar
dengyihao 已提交
120
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
121 122
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
123
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
124
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
125 126
  }

X
Xiaoyu Wang 已提交
127 128
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
129
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
130 131
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
132 133
  }

X
Xiaoyu Wang 已提交
134 135 136
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
137

dengyihao's avatar
dengyihao 已提交
138
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
139 140
  return TSDB_CODE_SUCCESS;
}
141

X
Xiaoyu Wang 已提交
142
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
H
Haojun Liao 已提交
143 144
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
145
  SParseContext cxt = {
dengyihao's avatar
dengyihao 已提交
146 147 148 149 150 151 152 153
      .requestId = pRequest->requestId,
      .acctId = pTscObj->acctId,
      .db = getDbOfConnection(pTscObj),
      .pSql = pRequest->sqlstr,
      .sqlLen = pRequest->sqlLen,
      .pMsg = pRequest->msgBuf,
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
      .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
154
  };
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156 157
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
158
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
159
    tfree(cxt.db);
H
Haojun Liao 已提交
160 161 162 163
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
164
  if (TSDB_CODE_SUCCESS == code && ((*pQuery)->haveResultSet)) {
X
Xiaoyu Wang 已提交
165 166
    setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
  }
167

H
Haojun Liao 已提交
168
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
169 170
  return code;
}
H
Haojun Liao 已提交
171

X
Xiaoyu Wang 已提交
172
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
173 174 175
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
X
Xiaoyu Wang 已提交
176
  pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
177 178 179

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
180 181 182 183 184 185 186 187

  if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
    SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
    if (pShowReqInfo->pArray == NULL) {
      pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
      pShowReqInfo->pArray = pMsgInfo->pExtension;
    }
  }
188 189
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
190

191
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
192 193
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
194

195 196
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
X
Xiaoyu Wang 已提交
197
  SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId };
198
  int32_t  code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
199 200 201 202
  if (code != 0) {
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
203 204
}

205 206
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
207

208 209
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
210 211

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
212
    pResInfo->fields[i].bytes = pSchema[i].bytes;
dengyihao's avatar
dengyihao 已提交
213
    pResInfo->fields[i].type = pSchema[i].type;
214
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
215
  }
X
Xiaoyu Wang 已提交
216 217
}

X
Xiaoyu Wang 已提交
218
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
219
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
D
dapan1121 已提交
220
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
D
dapan1121 已提交
221
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
D
dapan1121 已提交
222
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
223 224
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
225 226
    }

D
dapan1121 已提交
227
    pRequest->errList = res.errList;
D
dapan1121 已提交
228
    pRequest->code = code;
H
Haojun Liao 已提交
229
    return pRequest->code;
X
Xiaoyu Wang 已提交
230
  }
231

232
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
233 234
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    
D
dapan1121 已提交
235 236
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
237 238
    }
  }
D
dapan1121 已提交
239 240

  pRequest->errList = res.errList;  
D
dapan1121 已提交
241 242
  pRequest->code = res.code;
  return pRequest->code;
X
Xiaoyu Wang 已提交
243
}
X
Xiaoyu Wang 已提交
244

D
dapan1121 已提交
245
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
S
Shengliang Guan 已提交
246
  SRequestObj* pRequest = NULL;
X
Xiaoyu Wang 已提交
247
  SQuery* pQuery = NULL;
X
Xiaoyu Wang 已提交
248
  SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
249

X
Xiaoyu Wang 已提交
250
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
251
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
X
Xiaoyu Wang 已提交
252
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
H
Haojun Liao 已提交
253

254
  if (pQuery->directRpc) {
X
Xiaoyu Wang 已提交
255
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
256
  } else {
X
Xiaoyu Wang 已提交
257
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
258
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
259
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
260 261 262
  }

_return:
263
  taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
264
  qDestroyQuery(pQuery);
X
Xiaoyu Wang 已提交
265
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
266 267
    pRequest->code = terrno;
  }
268

269 270 271
  return pRequest;
}

D
dapan1121 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
int32_t clientProcessErrorList(SArray **pList) {
  SArray *errList = *pList;
  int32_t errNum = (int32_t)taosArrayGetSize(errList);
  
  for (int32_t i = 0; i < errNum; ++i) {
    SQueryErrorInfo *errInfo = taosArrayGet(errList, i);
    if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) {
      if (i == (errNum - 1)) {
        break;
      }
      
      // TODO REMOVE SAME DB ERROR
    } else {
      taosArrayRemove(errList, i);
      --i;
      --errNum;
    }
  }

  if (0 == errNum) {
    taosArrayDestroy(*pList);
    *pList = NULL;
  }

  return TSDB_CODE_SUCCESS;
}


SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  int32_t code = 0;
  bool quit = false;

  while (!quit) {
    pRequest = execQueryImpl(pTscObj, sql, sqlLen);
    if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) {
      break;
    }

    code = clientProcessErrorList(&pRequest->errList);
    if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) {
      break;
    }

    int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList);
    for (int32_t i = 0; i < errNum; ++i) {
      SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i);
      
      if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) {
        SCatalog *pCatalog = NULL;
        code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          quit = true;
          break;
        }
        SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

        char dbFName[TSDB_DB_FNAME_LEN];
        tNameGetFullDbName(&errInfo->tableName, dbFName);
        
        code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
        if (code != TSDB_CODE_SUCCESS) {
          quit = true;
          break;
        }
      }
    }
D
dapan1121 已提交
339 340 341 342

    if (!quit) {
      destroyRequest(pRequest);
    }
D
dapan1121 已提交
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362
  }

  if (code) {
    pRequest->code = code;
  }
  
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
363 364
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
365

H
Haojun Liao 已提交
366
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
367
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
368
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
369
  mgmtEpSet->inUse = 0;
370

S
Shengliang Guan 已提交
371 372 373 374
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
375
    }
S
Shengliang Guan 已提交
376 377 378 379 380 381 382 383 384

    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
385
    }
S
Shengliang Guan 已提交
386 387 388

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
389 390 391 392 393 394 395 396 397 398
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
399 400 401
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                         SAppInstInfo* pAppInfo) {
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
402 403 404 405 406
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
407
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
408 409 410
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
411 412 413
    return NULL;
  }

414
  SMsgSendInfo* body = buildConnectMsg(pRequest);
415 416

  int64_t transporterId = 0;
H
Haojun Liao 已提交
417
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
418 419 420

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
421 422
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
423 424 425 426 427 428
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
dengyihao's avatar
dengyihao 已提交
429 430
    tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
431 432 433 434 435 436
    destroyRequest(pRequest);
  }

  return pTscObj;
}

dengyihao's avatar
dengyihao 已提交
437 438
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
439 440 441 442 443
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
444
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
445

446
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
447 448 449
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
450

S
Shengliang Guan 已提交
451 452
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
453

H
Haojun Liao 已提交
454
  char* db = getDbOfConnection(pObj);
455
  if (db != NULL) {
S
Shengliang Guan 已提交
456
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
457
  }
458
  tfree(db);
459

S
Shengliang Guan 已提交
460 461 462 463 464 465 466
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
  void*   pReq = malloc(contLen);
  tSerializeSConnectReq(pReq, contLen, &connectReq);
467

S
Shengliang Guan 已提交
468 469
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
470
  return pMsgSendInfo;
471 472
}

473
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
474
  assert(pMsgBody != NULL);
475 476
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
477
}
dengyihao's avatar
dengyihao 已提交
478
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
D
dapan1121 已提交
479
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
480
}
481
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
482
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
483
  assert(pMsg->ahandle != NULL);
484

485
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
486
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
487
    assert(pRequest->self == pSendInfo->requestObjRefId);
488

489 490
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
491

dengyihao's avatar
dengyihao 已提交
492
    STscObj* pTscObj = pRequest->pTscObj;
493 494 495 496
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
497 498
    }

499
    /*
dengyihao's avatar
dengyihao 已提交
500 501
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
502
     */
503
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
504
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
505 506
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
507
    } else {
dengyihao's avatar
dengyihao 已提交
508 509
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
510
    }
511

H
Haojun Liao 已提交
512
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
513 514
  }

dengyihao's avatar
dengyihao 已提交
515
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
516 517 518 519 520 521 522 523 524

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
525 526
  }

527
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
528
  rpcFreeCont(pMsg->pCont);
529
  destroySendMsgInfo(pSendInfo);
530
}
531

dengyihao's avatar
dengyihao 已提交
532
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
533 534 535 536 537 538 539 540 541 542 543 544 545
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

dengyihao's avatar
dengyihao 已提交
546 547 548
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
549
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
550 551
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
552

dengyihao's avatar
dengyihao 已提交
553
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
554 555
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
556
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
557
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
558 559 560 561
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
562
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
563

H
Haojun Liao 已提交
564 565
  SEpSet epSet = {0};

H
Haojun Liao 已提交
566
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
567
    if (pRequest->type == TDMT_VND_QUERY) {
568 569 570 571 572
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

573
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
D
dapan1121 已提交
574
      int32_t         code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
575 576 577 578
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
579

580
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
dengyihao's avatar
dengyihao 已提交
581 582
      tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
               pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
583

584
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
585 586 587 588
        return NULL;
      }

      goto _return;
589
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
590
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
591
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
592 593
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
594
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
dengyihao's avatar
dengyihao 已提交
595
      SVgroupInfo*  pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
596

L
Liu Jicong 已提交
597
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
598 599 600 601 602 603 604 605
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

dengyihao's avatar
dengyihao 已提交
606
      SVgroupInfo*     pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
607 608 609 610 611 612 613
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
614
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
615

H
Haojun Liao 已提交
616
      int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
617
      STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
618
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
619 620 621
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
dengyihao's avatar
dengyihao 已提交
622
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
D
dapan1121 已提交
623
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
dengyihao's avatar
dengyihao 已提交
624

D
dapan1121 已提交
625 626 627
      if (pResultInfo->completed) {
        return NULL;
      }
H
Haojun Liao 已提交
628
    }
H
Haojun Liao 已提交
629

630
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
631

632
    int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
633
    STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
634
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
635 636 637 638 639 640 641 642 643

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
644 645
_return:

dengyihao's avatar
dengyihao 已提交
646
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
H
Haojun Liao 已提交
647 648 649 650
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
651
    }
H
Haojun Liao 已提交
652 653 654 655 656 657
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
658 659
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
dengyihao's avatar
dengyihao 已提交
660 661
    pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
662 663 664 665
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

666
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
667 668 669 670 671
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
672 673 674
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
675 676 677
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
dengyihao's avatar
dengyihao 已提交
678 679
    pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows);
    pResultInfo->pCol[i] = pResultInfo->row[i];
H
Haojun Liao 已提交
680
    offset += pResultInfo->fields[i].bytes;
681
  }
S
Shengliang Guan 已提交
682 683
}

H
Haojun Liao 已提交
684
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
685
  char* p = NULL;
686
  pthread_mutex_lock(&pObj->mutex);
687 688 689 690
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
691

692
  pthread_mutex_unlock(&pObj->mutex);
693 694 695 696 697 698 699 700 701
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
702

703
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
704 705
  assert(pResultInfo != NULL && pRsp != NULL);

dengyihao's avatar
dengyihao 已提交
706 707
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
H
Haojun Liao 已提交
708
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
709
  pResultInfo->current = 0;
710
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
711

712
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
713
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
714
}