clientImpl.c 24.2 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4 5 6
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
H
Haojun Liao 已提交
7
#include "tdatablock.h"
8
#include "tdef.h"
9
#include "tglobal.h"
10
#include "tmsgtype.h"
H
Haojun Liao 已提交
11
#include "tpagedbuf.h"
12
#include "tref.h"
X
Xiaoyu Wang 已提交
13

S
Shengliang Guan 已提交
14
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
dengyihao's avatar
dengyihao 已提交
15 16 17
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static void          setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
18

19
static bool stringLengthCheck(const char* str, size_t maxsize) {
20 21 22 23 24 25 26 27 28 29 30 31
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
32
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
33

dengyihao's avatar
dengyihao 已提交
34
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
35

dengyihao's avatar
dengyihao 已提交
36
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
37

38 39 40 41 42 43
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
44 45 46
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                                SAppInstInfo* pAppInfo);
static void     setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
47

dengyihao's avatar
dengyihao 已提交
48 49
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
                            uint16_t port) {
50 51 52 53
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

54 55 56 57 58
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

59
  char localDb[TSDB_DB_NAME_LEN] = {0};
60
  if (db != NULL) {
dengyihao's avatar
dengyihao 已提交
61
    if (!validateDbName(db)) {
62 63 64 65
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

66 67
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
68 69
  }

70
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
71 72 73 74 75 76
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
77
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
78 79 80 81
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

82
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
83 84 85 86 87 88 89 90 91 92 93 94 95
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.eps[0].port = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
96

dengyihao's avatar
dengyihao 已提交
97
  char*          key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
98
  SAppInstInfo** pInst = NULL;
99

H
Haojun Liao 已提交
100 101 102
  pthread_mutex_lock(&appInfo.mutex);

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
103
  SAppInstInfo* p = NULL;
104
  if (pInst == NULL) {
L
Liu Jicong 已提交
105
    p = calloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
106
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
107
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
108
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
109
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
110

H
Haojun Liao 已提交
111
    pInst = &p;
112 113
  }

H
Haojun Liao 已提交
114 115
  pthread_mutex_unlock(&appInfo.mutex);

H
Haojun Liao 已提交
116
  tfree(key);
H
Haojun Liao 已提交
117
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
118 119
}

dengyihao's avatar
dengyihao 已提交
120
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
121 122
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
123
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
124
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
125 126
  }

X
Xiaoyu Wang 已提交
127 128
  (*pRequest)->sqlstr = malloc(sqlLen + 1);
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
129
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
130 131
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
132 133
  }

X
Xiaoyu Wang 已提交
134 135 136
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
137

dengyihao's avatar
dengyihao 已提交
138
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
139 140
  return TSDB_CODE_SUCCESS;
}
141

X
Xiaoyu Wang 已提交
142
int32_t parseSql(SRequestObj* pRequest, SQuery** pQuery) {
H
Haojun Liao 已提交
143 144
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
145
  SParseContext cxt = {
dengyihao's avatar
dengyihao 已提交
146 147 148 149 150 151 152 153
      .requestId = pRequest->requestId,
      .acctId = pTscObj->acctId,
      .db = getDbOfConnection(pTscObj),
      .pSql = pRequest->sqlstr,
      .sqlLen = pRequest->sqlLen,
      .pMsg = pRequest->msgBuf,
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
      .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
154
  };
H
Haojun Liao 已提交
155

H
Haojun Liao 已提交
156 157
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
158
  if (code != TSDB_CODE_SUCCESS) {
H
Haojun Liao 已提交
159
    tfree(cxt.db);
H
Haojun Liao 已提交
160 161 162 163
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
164
  if (TSDB_CODE_SUCCESS == code && ((*pQuery)->haveResultSet)) {
X
Xiaoyu Wang 已提交
165 166
    setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
  }
167

H
Haojun Liao 已提交
168
  tfree(cxt.db);
X
Xiaoyu Wang 已提交
169 170
  return code;
}
H
Haojun Liao 已提交
171

X
Xiaoyu Wang 已提交
172
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
173 174 175
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
X
Xiaoyu Wang 已提交
176
  pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
177 178 179

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
180 181 182 183 184 185 186 187

  if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
    SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
    if (pShowReqInfo->pArray == NULL) {
      pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
      pShowReqInfo->pArray = pMsgInfo->pExtension;
    }
  }
188 189
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
190

191
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
192 193
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
194

195 196
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
X
Xiaoyu Wang 已提交
197 198 199 200 201 202
  SPlanContext cxt = {
    .queryId = pRequest->requestId,
    .acctId = pRequest->pTscObj->acctId,
    .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
    .pAstRoot = pQuery->pRoot
  };
203
  int32_t  code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
204 205 206 207
  if (code != 0) {
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
208 209
}

210 211
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
212

213 214
  pResInfo->numOfCols = numOfCols;
  pResInfo->fields = calloc(numOfCols, sizeof(pSchema[0]));
215 216

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
217
    pResInfo->fields[i].bytes = pSchema[i].bytes;
dengyihao's avatar
dengyihao 已提交
218
    pResInfo->fields[i].type = pSchema[i].type;
219
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
220
  }
X
Xiaoyu Wang 已提交
221 222
}

223

X
Xiaoyu Wang 已提交
224
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
225
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
226

D
dapan1121 已提交
227
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
D
dapan1121 已提交
228
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
D
dapan1121 已提交
229
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
230 231
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
232 233
    }

D
dapan1121 已提交
234
    pRequest->errList = res.errList;
D
dapan1121 已提交
235
    pRequest->code = code;
D
dapan1121 已提交
236
    terrno = code;
H
Haojun Liao 已提交
237
    return pRequest->code;
X
Xiaoyu Wang 已提交
238
  }
239

240
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
241
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
242

D
dapan1121 已提交
243 244
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
245 246
    }
  }
D
dapan1121 已提交
247 248

  pRequest->errList = res.errList;  
D
dapan1121 已提交
249
  pRequest->code = res.code;
D
dapan1121 已提交
250
  terrno = res.code;  
D
dapan1121 已提交
251
  return pRequest->code;
X
Xiaoyu Wang 已提交
252
}
X
Xiaoyu Wang 已提交
253

D
dapan1121 已提交
254
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
S
Shengliang Guan 已提交
255
  SRequestObj* pRequest = NULL;
X
Xiaoyu Wang 已提交
256
  SQuery* pQuery = NULL;
D
dapan1121 已提交
257
  int32_t code = 0;
X
Xiaoyu Wang 已提交
258
  SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
259 260

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
X
Xiaoyu Wang 已提交
261
  CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return);
H
Haojun Liao 已提交
262

263
  if (pQuery->directRpc) {
X
Xiaoyu Wang 已提交
264
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
265
  } else {
X
Xiaoyu Wang 已提交
266
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
267
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
268 269 270
  }

_return:
271
  taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
272
  qDestroyQuery(pQuery);
D
dapan1121 已提交
273
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
274 275
    pRequest->code = terrno;
  }
276

277 278 279
  return pRequest;
}

D
dapan1121 已提交
280 281 282 283 284 285
int32_t clientProcessErrorList(SArray **pList) {
  SArray *errList = *pList;
  int32_t errNum = (int32_t)taosArrayGetSize(errList);
  
  for (int32_t i = 0; i < errNum; ++i) {
    SQueryErrorInfo *errInfo = taosArrayGet(errList, i);
D
dapan1121 已提交
286
    if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) {
D
dapan1121 已提交
287 288 289 290 291
      if (i == (errNum - 1)) {
        break;
      }
      
      // TODO REMOVE SAME DB ERROR
D
dapan1121 已提交
292
    } else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code) || NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) {
D
dapan1121 已提交
293
      continue;
D
dapan1121 已提交
294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
    } else {
      taosArrayRemove(errList, i);
      --i;
      --errNum;
    }
  }

  if (0 == errNum) {
    taosArrayDestroy(*pList);
    *pList = NULL;
  }

  return TSDB_CODE_SUCCESS;
}


SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  int32_t code = 0;
D
dapan 已提交
313 314
  int32_t needRetryNum = 0;
  int32_t needRetryFailNum = 0;
D
dapan1121 已提交
315

D
dapan 已提交
316
  while (true) {
D
dapan1121 已提交
317 318 319 320 321 322 323 324 325 326 327 328 329
    pRequest = execQueryImpl(pTscObj, sql, sqlLen);
    if (TSDB_CODE_SUCCESS == pRequest->code || NULL == pRequest->errList) {
      break;
    }

    code = clientProcessErrorList(&pRequest->errList);
    if (code != TSDB_CODE_SUCCESS || NULL == pRequest->errList) {
      break;
    }

    int32_t errNum = (int32_t)taosArrayGetSize(pRequest->errList);
    for (int32_t i = 0; i < errNum; ++i) {
      SQueryErrorInfo *errInfo = taosArrayGet(pRequest->errList, i);
D
dapan 已提交
330
      int32_t tcode = 0;
D
dapan1121 已提交
331
      
D
dapan 已提交
332 333 334
      if (NEED_CLIENT_REFRESH_VG_ERROR(errInfo->code)) {
        ++needRetryNum;
        
D
dapan1121 已提交
335
        SCatalog *pCatalog = NULL;
D
dapan 已提交
336 337 338 339 340
        tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
        if (tcode != TSDB_CODE_SUCCESS) {
          ++needRetryFailNum;
          code = tcode;
          continue;
D
dapan1121 已提交
341
        }
D
dapan 已提交
342
        
D
dapan1121 已提交
343 344 345 346 347
        SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

        char dbFName[TSDB_DB_FNAME_LEN];
        tNameGetFullDbName(&errInfo->tableName, dbFName);
        
D
dapan 已提交
348 349 350 351 352 353 354 355 356 357 358 359
        tcode = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
        if (tcode != TSDB_CODE_SUCCESS) {
          ++needRetryFailNum;
          code = tcode;
          continue;
        }
      } else if (NEED_CLIENT_RM_TBLMETA_ERROR(errInfo->code)) {
        SCatalog *pCatalog = NULL;
        tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
        if (tcode != TSDB_CODE_SUCCESS) {
          code = tcode;
          continue;
D
dapan1121 已提交
360
        }
D
dapan 已提交
361 362
        
        catalogRemoveTableMeta(pCatalog, &errInfo->tableName);
D
dapan1121 已提交
363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384
      } else if (NEED_CLIENT_REFRESH_TBLMETA_ERROR(errInfo->code)) {
        ++needRetryNum;
        
        SCatalog *pCatalog = NULL;
        tcode = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
        if (tcode != TSDB_CODE_SUCCESS) {
          ++needRetryFailNum;
          code = tcode;
          continue;
        }
        
        SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

        char dbFName[TSDB_DB_FNAME_LEN];
        tNameGetFullDbName(&errInfo->tableName, dbFName);
        
        tcode = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, &errInfo->tableName, -1);
        if (tcode != TSDB_CODE_SUCCESS) {
          ++needRetryFailNum;
          code = tcode;
          continue;
        }
D
dapan1121 已提交
385 386
      }
    }
D
dapan1121 已提交
387

D
dapan 已提交
388 389
    if ((needRetryNum && (0 == needRetryFailNum) && (TDMT_VND_SUBMIT != pRequest->type && TDMT_VND_CREATE_TABLE != pRequest->type))
      || (needRetryNum && (needRetryNum > needRetryFailNum) && (TDMT_VND_SUBMIT == pRequest->type && TDMT_VND_CREATE_TABLE == pRequest->type))) {
D
dapan1121 已提交
390
      destroyRequest(pRequest);
D
dapan 已提交
391
      continue;
D
dapan1121 已提交
392
    }
D
dapan 已提交
393 394

    break;
D
dapan1121 已提交
395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414
  }

  if (code) {
    pRequest->code = code;
  }
  
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
415 416
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
417

H
Haojun Liao 已提交
418
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
419
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
420
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
421
  mgmtEpSet->inUse = 0;
422

S
Shengliang Guan 已提交
423 424 425 426
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
427
    }
S
Shengliang Guan 已提交
428 429 430 431 432 433 434 435 436

    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
437
    }
S
Shengliang Guan 已提交
438 439 440

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
441 442 443 444 445 446 447 448 449 450
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
451 452 453
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                         SAppInstInfo* pAppInfo) {
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
454 455 456 457 458
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
459
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
460 461 462
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
463 464 465
    return NULL;
  }

466
  SMsgSendInfo* body = buildConnectMsg(pRequest);
467 468

  int64_t transporterId = 0;
H
Haojun Liao 已提交
469
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
470 471 472

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
473 474
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
475 476 477 478 479 480
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
dengyihao's avatar
dengyihao 已提交
481 482
    tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
483 484 485 486 487 488
    destroyRequest(pRequest);
  }

  return pTscObj;
}

dengyihao's avatar
dengyihao 已提交
489 490
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
  SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
491 492 493 494 495
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
496
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
497

498
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
499 500 501
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
502

S
Shengliang Guan 已提交
503 504
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
505

H
Haojun Liao 已提交
506
  char* db = getDbOfConnection(pObj);
507
  if (db != NULL) {
S
Shengliang Guan 已提交
508
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
509
  }
510
  tfree(db);
511

S
Shengliang Guan 已提交
512 513 514 515 516 517 518
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
  void*   pReq = malloc(contLen);
  tSerializeSConnectReq(pReq, contLen, &connectReq);
519

S
Shengliang Guan 已提交
520 521
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
522
  return pMsgSendInfo;
523 524
}

525
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
526
  assert(pMsgBody != NULL);
527 528
  tfree(pMsgBody->msgInfo.pData);
  tfree(pMsgBody);
H
Haojun Liao 已提交
529
}
dengyihao's avatar
dengyihao 已提交
530
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
D
dapan1121 已提交
531
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
532
}
533
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
534
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
535
  assert(pMsg->ahandle != NULL);
536

537
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
538
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
539
    assert(pRequest->self == pSendInfo->requestObjRefId);
540

541
    pRequest->metric.rsp = taosGetTimestampMs();
542

dengyihao's avatar
dengyihao 已提交
543
    STscObj* pTscObj = pRequest->pTscObj;
544 545 546 547
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
548 549
    }

550
    /*
dengyihao's avatar
dengyihao 已提交
551 552
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
553
     */
554
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
555
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
556 557
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
558
    } else {
dengyihao's avatar
dengyihao 已提交
559 560
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
561
    }
562

H
Haojun Liao 已提交
563
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
564 565
  }

dengyihao's avatar
dengyihao 已提交
566
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
567 568 569 570 571 572 573 574 575

  if (pMsg->contLen > 0) {
    buf.pData = calloc(1, pMsg->contLen);
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
576 577
  }

578
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
579
  rpcFreeCont(pMsg->pCont);
580
  destroySendMsgInfo(pSendInfo);
581
}
582

dengyihao's avatar
dengyihao 已提交
583
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
584 585 586 587 588 589 590 591 592 593 594 595 596
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

dengyihao's avatar
dengyihao 已提交
597 598 599
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
600
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
601 602
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
603

dengyihao's avatar
dengyihao 已提交
604
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
605 606
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
607
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
608
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
609 610 611 612
}

void* doFetchRow(SRequestObj* pRequest) {
  assert(pRequest != NULL);
613
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
614

H
Haojun Liao 已提交
615 616
  SEpSet epSet = {0};

H
Haojun Liao 已提交
617
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
618
    if (pRequest->type == TDMT_VND_QUERY) {
619 620 621 622 623
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
        return NULL;
      }

624
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
D
dapan1121 已提交
625
      int32_t         code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
626 627 628 629
      if (code != TSDB_CODE_SUCCESS) {
        pRequest->code = code;
        return NULL;
      }
H
Haojun Liao 已提交
630

631
      setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
dengyihao's avatar
dengyihao 已提交
632 633
      tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
               pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
634

635
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
636 637 638 639
        return NULL;
      }

      goto _return;
640
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
641
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
642
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
643 644
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
645
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
dengyihao's avatar
dengyihao 已提交
646
      SVgroupInfo*  pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
647

L
Liu Jicong 已提交
648
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
649 650 651 652 653 654 655 656
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

dengyihao's avatar
dengyihao 已提交
657
      SVgroupInfo*     pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
658 659 660 661 662 663 664
      SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
665
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
666

H
Haojun Liao 已提交
667
      int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
668
      STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
669
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
670 671 672
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
dengyihao's avatar
dengyihao 已提交
673
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
D
dapan1121 已提交
674
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
dengyihao's avatar
dengyihao 已提交
675

D
dapan1121 已提交
676 677 678
      if (pResultInfo->completed) {
        return NULL;
      }
H
Haojun Liao 已提交
679
    }
H
Haojun Liao 已提交
680

681
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
682

683
    int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
684
    STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
685
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
686 687 688 689 690 691 692 693 694

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
695 696
_return:

dengyihao's avatar
dengyihao 已提交
697
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
H
Haojun Liao 已提交
698 699 700 701
    pResultInfo->row[i] = pResultInfo->pCol[i] + pResultInfo->fields[i].bytes * pResultInfo->current;
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->length[i] = varDataLen(pResultInfo->row[i]);
      pResultInfo->row[i] = varDataVal(pResultInfo->row[i]);
702
    }
H
Haojun Liao 已提交
703 704 705 706 707 708
  }

  pResultInfo->current += 1;
  return pResultInfo->row;
}

H
Haojun Liao 已提交
709 710
static void doPrepareResPtr(SReqResultInfo* pResInfo) {
  if (pResInfo->row == NULL) {
dengyihao's avatar
dengyihao 已提交
711 712
    pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = calloc(pResInfo->numOfCols, POINTER_BYTES);
H
Haojun Liao 已提交
713 714 715 716
    pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
  }
}

717
void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
718 719 720 721 722
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
    return;
  }

H
Haojun Liao 已提交
723 724 725
  // todo check for the failure of malloc
  doPrepareResPtr(pResultInfo);

H
Haojun Liao 已提交
726 727 728
  int32_t offset = 0;
  for (int32_t i = 0; i < numOfCols; ++i) {
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
dengyihao's avatar
dengyihao 已提交
729 730
    pResultInfo->row[i] = (char*)(pResultInfo->pData + offset * pResultInfo->numOfRows);
    pResultInfo->pCol[i] = pResultInfo->row[i];
H
Haojun Liao 已提交
731
    offset += pResultInfo->fields[i].bytes;
732
  }
S
Shengliang Guan 已提交
733 734
}

H
Haojun Liao 已提交
735
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
736
  char* p = NULL;
737
  pthread_mutex_lock(&pObj->mutex);
738 739 740 741
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
742

743
  pthread_mutex_unlock(&pObj->mutex);
744 745 746 747 748 749 750 751 752
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
  pthread_mutex_lock(&pTscObj->mutex);
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
  pthread_mutex_unlock(&pTscObj->mutex);
}
S
Shengliang Guan 已提交
753

754
void setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
755 756
  assert(pResultInfo != NULL && pRsp != NULL);

dengyihao's avatar
dengyihao 已提交
757 758
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
H
Haojun Liao 已提交
759
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
dengyihao's avatar
dengyihao 已提交
760
  pResultInfo->current = 0;
761
  pResultInfo->completed = (pRsp->completed == 1);
H
Haojun Liao 已提交
762

763
  pResultInfo->totalRows += pResultInfo->numOfRows;
H
Haojun Liao 已提交
764
  setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
765
}