clientImpl.c 22.5 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5 6
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
H
Haojun Liao 已提交
7
#include "tdatablock.h"
8
#include "tdef.h"
9
#include "tglobal.h"
10
#include "tmsgtype.h"
H
Haojun Liao 已提交
11
#include "tpagedbuf.h"
12
#include "tref.h"
X
Xiaoyu Wang 已提交
13

S
Shengliang Guan 已提交
14
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
dengyihao's avatar
dengyihao 已提交
15 16 17
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void          setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
18

19
static bool stringLengthCheck(const char* str, size_t maxsize) {
20 21 22 23 24 25 26 27 28 29 30 31
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
32
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
33

dengyihao's avatar
dengyihao 已提交
34
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
35

dengyihao's avatar
dengyihao 已提交
36
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
37

38 39 40 41 42 43
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
44 45 46
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                                SAppInstInfo* pAppInfo);
static void     setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
47

dengyihao's avatar
dengyihao 已提交
48 49
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
                            uint16_t port) {
50 51 52 53
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

54 55 56 57 58
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

59
  char localDb[TSDB_DB_NAME_LEN] = {0};
60
  if (db != NULL) {
dengyihao's avatar
dengyihao 已提交
61
    if (!validateDbName(db)) {
62 63 64 65
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

66 67
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
68 69
  }

70
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
71 72 73 74 75 76
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
77
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
78 79 80 81
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

82
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.eps[0].port = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
96

dengyihao's avatar
dengyihao 已提交
97
  char*          key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
98
  SAppInstInfo** pInst = NULL;
99

H
Haojun Liao 已提交
100 101 102
  pthread_mutex_lock(&appInfo.mutex);

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
103
  SAppInstInfo* p = NULL;
104
  if (pInst == NULL) {
L
Liu Jicong 已提交
105
    p = calloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
106
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
107
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
108
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
109
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
110

H
Haojun Liao 已提交
111
    pInst = &p;
112 113
  }

H
Haojun Liao 已提交
114 115
  pthread_mutex_unlock(&appInfo.mutex);

H
Haojun Liao 已提交
116
  tfree(key);
H
Haojun Liao 已提交
117
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
118 119
}

dengyihao's avatar
dengyihao 已提交
120
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
121 122
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
123
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
124
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
125 126
  }

X
Xiaoyu Wang 已提交
127 128
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
129
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
130 131
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
132 133
  }

X
Xiaoyu Wang 已提交
134 135 136
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
137

dengyihao's avatar
dengyihao 已提交
138
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
139 140
  return TSDB_CODE_SUCCESS;
}
141

X
Xiaoyu Wang 已提交
142
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
H
Haojun Liao 已提交
143 144
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
145
  SParseContext cxt = {
dengyihao's avatar
dengyihao 已提交
146 147 148 149 150 151 152 153
      .requestId = pRequest->requestId,
      .acctId = pTscObj->acctId,
      .db = getDbOfConnection(pTscObj),
      .pSql = pRequest->sqlstr,
      .sqlLen = pRequest->sqlLen,
      .pMsg = pRequest->msgBuf,
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
      .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
154
  };
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156 157
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
158
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
159
    tfree(cxt.db);
H
Haojun Liao 已提交
160 161 162 163
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
164
  if (TSDB_CODE_SUCCESS == code && ((*pQuery)->haveResultSet)) {
X
Xiaoyu Wang 已提交
165 166
    setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
  }
167

H
Haojun Liao 已提交
168
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
169 170
  return code;
}
H
Haojun Liao 已提交
171

X
Xiaoyu Wang 已提交
172
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
173 174 175
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
X
Xiaoyu Wang 已提交
176
  pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
177 178 179

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
180 181 182 183 184 185 186 187

  if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
    SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
    if (pShowReqInfo->pArray == NULL) {
      pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
      pShowReqInfo->pArray = pMsgInfo->pExtension;
    }
  }
188 189
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
190

191
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
192 193
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
194

195 196
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
X
Xiaoyu Wang 已提交
197
  SPlanContext cxt = { .queryId = pRequest->requestId, .pAstRoot = pQuery->pRoot, .acctId = pRequest->pTscObj->acctId };
198
  int32_t  code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
199 200 201 202
  if (code != 0) {
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
203 204
}

205 206
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
207

208 209
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
210 211

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
212
    pResInfo->fields[i].bytes = pSchema[i].bytes;
dengyihao's avatar
dengyihao 已提交
213
    pResInfo->fields[i].type = pSchema[i].type;
214
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
215
  }
X
Xiaoyu Wang 已提交
216 217
}

X
Xiaoyu Wang 已提交
218
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
219
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
D
dapan1121 已提交
220
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
D
dapan1121 已提交
221
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
D
dapan1121 已提交
222
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
223 224
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
225 226
    }

D
dapan1121 已提交
227
    pRequest->errList = res.errList;
D
dapan1121 已提交
228
    pRequest->code = code;
H
Haojun Liao 已提交
229
    return pRequest->code;
X
Xiaoyu Wang 已提交
230
  }
231

232
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
233 234
    pRequest->body.resInfo.numOfRows = res.numOfRows;
    
D
dapan1121 已提交
235 236
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
237 238
    }
  }
D
dapan1121 已提交
239 240

  pRequest->errList = res.errList;  
D
dapan1121 已提交
241 242
  pRequest->code = res.code;
  return pRequest->code;
X
Xiaoyu Wang 已提交
243
}
X
Xiaoyu Wang 已提交
244

D
dapan1121 已提交
245
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
S
Shengliang Guan 已提交
246
  SRequestObj* pRequest = NULL;
X
Xiaoyu Wang 已提交
247
  SQuery* pQuery = NULL;
X
Xiaoyu Wang 已提交
248
  SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
249

X
Xiaoyu Wang 已提交
250
  terrno = TSDB_CODE_SUCCESS;
X
Xiaoyu Wang 已提交
251
  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
X
Xiaoyu Wang 已提交
252
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
H
Haojun Liao 已提交
253

254
  if (pQuery->directRpc) {
X
Xiaoyu Wang 已提交
255
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
256
  } else {
X
Xiaoyu Wang 已提交
257
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
258
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
259
    pRequest->code = terrno;
X
Xiaoyu Wang 已提交
260 261 262
  }

_return:
263
  taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
264
  qDestroyQuery(pQuery);
X
Xiaoyu Wang 已提交
265
  if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) {
X
Xiaoyu Wang 已提交
266 267
    pRequest->code = terrno;
  }
268

269 270 271
  return pRequest;
}

D
dapan1121 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
int32_t clientProcessErrorList(SArray **pList) {
  SArray *errList = *pList;
  int32_t errNum = (int32_t)taosArrayGetSize(errList);
  
  for (int32_t i = 0; i < errNum; ++i) {
    SQueryErrorInfo *errInfo = taosArrayGet(errList, i);
    if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) {
      if (i == (errNum - 1)) {
        break;
      }
      
      // TODO REMOVE SAME DB ERROR
    } else {
      taosArrayRemove(errList, i);
      --i;
      --errNum;
    }
  }

  if (0 == errNum) {
    taosArrayDestroy(*pList);
    *pList = NULL;
  }

  return TSDB_CODE_SUCCESS;
}


SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  int32_t code = 0;
  bool quit = false;

  while (!quit) {
    pRequest = execQueryImpl(pTscObj, sql, sqlLen);
    if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) {
      break;
    }

    code = clientProcessErrorList(&pRequest->errList);
    if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) {
      break;
    }

    int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList);
    for (int32_t i = 0; i < errNum; ++i) {
      SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i);
      
      if (TSDB_CODE_VND_HASH_MISMATCH == errInfo->code) {
        SCatalog *pCatalog = NULL;
        code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
        if (code != TSDB_CODE_SUCCESS) {
          quit = true;
          break;
        }
        SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

        char dbFName[TSDB_DB_FNAME_LEN];
        tNameGetFullDbName(&errInfo->tableName, dbFName);
        
        code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
        if (code != TSDB_CODE_SUCCESS) {
          quit = true;
          break;
        }
      }
    }
  }

  if (code) {
    pRequest->code = code;
  }
  
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
359 360
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
361

H
Haojun Liao 已提交
362
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
363
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
364
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
365
  mgmtEpSet->inUse = 0;
366

S
Shengliang Guan 已提交
367 368 369 370
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
371
    }
S
Shengliang Guan 已提交
372 373 374 375 376 377 378 379 380

    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
381
    }
S
Shengliang Guan 已提交
382 383 384

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
385 386 387 388 389 390 391 392 393 394
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
395 396 397
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                         SAppInstInfo* pAppInfo) {
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
398 399 400 401 402
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
403
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
404 405 406
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
407 408 409
    return NULL;
  }

410
  SMsgSendInfo* body = buildConnectMsg(pRequest);
411 412

  int64_t transporterId = 0;
H
Haojun Liao 已提交
413
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
414 415 416

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
417 418
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
419 420 421 422 423 424
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
dengyihao's avatar
dengyihao 已提交
425 426
    tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
427 428 429 430 431 432
    destroyRequest(pRequest);
  }

  return pTscObj;
}

dengyihao's avatar
dengyihao 已提交
433 434
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
435 436 437 438 439
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
440
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
441

442
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
443 444 445
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
446

S
Shengliang Guan 已提交
447 448
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
449

H
Haojun Liao 已提交
450
  char* db = getDbOfConnection(pObj);
451
  if (db != NULL) {
S
Shengliang Guan 已提交
452
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
453
  }
454
  tfree(db);
455

S
Shengliang Guan 已提交
456 457 458 459 460 461 462
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
  void*   pReq = malloc(contLen);
  tSerializeSConnectReq(pReq, contLen, &connectReq);
463

S
Shengliang Guan 已提交
464 465
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
466
  return pMsgSendInfo;
467 468
}

469
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
470
  assert(pMsgBody != NULL);
471 472
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
473
}
dengyihao's avatar
dengyihao 已提交
474
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
D
dapan1121 已提交
475
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
476
}
477
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
478
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
479
  assert(pMsg->ahandle != NULL);
480

481
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
482
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
483
    assert(pRequest->self == pSendInfo->requestObjRefId);
484

485 486
    pRequest->metric.rsp = taosGetTimestampMs();
    pRequest->code = pMsg->code;
487

dengyihao's avatar
dengyihao 已提交
488
    STscObj* pTscObj = pRequest->pTscObj;
489 490 491 492
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
493 494
    }

495
    /*
dengyihao's avatar
dengyihao 已提交
496 497
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
498
     */
499
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
500
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
501 502
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
503
    } else {
dengyihao's avatar
dengyihao 已提交
504 505
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
506
    }
507

H
Haojun Liao 已提交
508
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
509 510
  }

dengyihao's avatar
dengyihao 已提交
511
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
512 513 514 515 516 517 518 519 520

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
521 522
  }

523
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
524
  rpcFreeCont(pMsg->pCont);
525
  destroySendMsgInfo(pSendInfo);
526
}
527

dengyihao's avatar
dengyihao 已提交
528
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
529 530 531 532 533 534 535 536 537 538 539 540 541
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

dengyihao's avatar
dengyihao 已提交
542 543 544
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
545
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
546 547
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
548

dengyihao's avatar
dengyihao 已提交
549
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
550 551
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
552
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
553
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
554 555 556 557
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
558
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
559

H
Haojun Liao 已提交
560 561
  SEpSet epSet = {0};

H
Haojun Liao 已提交
562
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
563
    if (pRequest->type == TDMT_VND_QUERY) {
564 565 566 567 568
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

569
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
D
dapan1121 已提交
570
      int32_t         code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
571 572 573 574
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
575

576
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
dengyihao's avatar
dengyihao 已提交
577 578
      tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
               pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
579

580
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
581 582 583 584
        return NULL;
      }

      goto _return;
585
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
586
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
587
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
588 589
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
590
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
dengyihao's avatar
dengyihao 已提交
591
      SVgroupInfo*  pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
592

L
Liu Jicong 已提交
593
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
594 595 596 597 598 599 600 601
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

dengyihao's avatar
dengyihao 已提交
602
      SVgroupInfo*     pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
603 604 605 606 607 608 609
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
610
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
611

H
Haojun Liao 已提交
612
      int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
613
      STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
614
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
615 616 617
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
dengyihao's avatar
dengyihao 已提交
618
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
D
dapan1121 已提交
619
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
dengyihao's avatar
dengyihao 已提交
620

D
dapan1121 已提交
621 622 623
      if (pResultInfo->completed) {
        return NULL;
      }
H
Haojun Liao 已提交
624
    }
H
Haojun Liao 已提交
625

626
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
627

628
    int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
629
    STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
630
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
631 632 633 634 635 636 637 638 639

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
640 641
_return:

dengyihao's avatar
dengyihao 已提交
642
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
H
Haojun Liao 已提交
643 644 645 646
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
647
    }
H
Haojun Liao 已提交
648 649 650 651 652 653
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
654 655
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
dengyihao's avatar
dengyihao 已提交
656 657
    pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
658 659 660 661
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

662
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
663 664 665 666 667
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
668 669 670
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
671 672 673
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
dengyihao's avatar
dengyihao 已提交
674 675
    pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows);
    pResultInfo->pCol[i] = pResultInfo->row[i];
H
Haojun Liao 已提交
676
    offset += pResultInfo->fields[i].bytes;
677
  }
S
Shengliang Guan 已提交
678 679
}

H
Haojun Liao 已提交
680
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
681
  char* p = NULL;
682
  pthread_mutex_lock(&pObj->mutex);
683 684 685 686
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
687

688
  pthread_mutex_unlock(&pObj->mutex);
689 690 691 692 693 694 695 696 697
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
698

699
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
700 701
  assert(pResultInfo != NULL && pRsp != NULL);

dengyihao's avatar
dengyihao 已提交
702 703
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
H
Haojun Liao 已提交
704
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
705
  pResultInfo->current = 0;
706
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
707

708
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
709
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
710
}