clientImpl.c 24.9 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4
#include "scheduler.h"
H
Haojun Liao 已提交
5
#include "tdatablock.h"
6
#include "tdef.h"
7
#include "tglobal.h"
8
#include "tmsgtype.h"
H
Haojun Liao 已提交
9
#include "tpagedbuf.h"
10
#include "tref.h"
X
Xiaoyu Wang 已提交
11

S
Shengliang Guan 已提交
12
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
dengyihao's avatar
dengyihao 已提交
13 14
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
15
static int32_t       setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp);
H
Haojun Liao 已提交
16

17
static bool stringLengthCheck(const char* str, size_t maxsize) {
18 19 20 21 22 23 24 25 26 27 28 29
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
30
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
31

dengyihao's avatar
dengyihao 已提交
32
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
33

dengyihao's avatar
dengyihao 已提交
34
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
35

36 37 38 39 40 41
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
42 43 44
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                                SAppInstInfo* pAppInfo);
static void     setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
45

dengyihao's avatar
dengyihao 已提交
46 47
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
                            uint16_t port) {
48 49 50 51
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

52 53 54 55 56
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

57
  char localDb[TSDB_DB_NAME_LEN] = {0};
58
  if (db != NULL) {
dengyihao's avatar
dengyihao 已提交
59
    if (!validateDbName(db)) {
60 61 62 63
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

64 65
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
66 67
  }

68
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
69 70 71 72 73 74
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
75
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
76 77 78 79
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

80
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
81 82 83 84 85 86 87 88 89 90 91 92 93
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.eps[0].port = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
94

dengyihao's avatar
dengyihao 已提交
95
  char*          key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
96
  SAppInstInfo** pInst = NULL;
97

wafwerar's avatar
wafwerar 已提交
98
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
99 100

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
101
  SAppInstInfo* p = NULL;
102
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
103
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
104
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
105
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
106
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
107
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
108

H
Haojun Liao 已提交
109
    pInst = &p;
110 111
  }

wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
113

wafwerar's avatar
wafwerar 已提交
114
  taosMemoryFreeClear(key);
H
Haojun Liao 已提交
115
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
116 117
}

dengyihao's avatar
dengyihao 已提交
118
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
119 120
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
121
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
122
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
123 124
  }

wafwerar's avatar
wafwerar 已提交
125
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
126
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
127
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
128 129
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
130 131
  }

X
Xiaoyu Wang 已提交
132 133 134
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
135

dengyihao's avatar
dengyihao 已提交
136
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
137 138
  return TSDB_CODE_SUCCESS;
}
139

X
Xiaoyu Wang 已提交
140
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
H
Haojun Liao 已提交
141 142
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
143
  SParseContext cxt = {
dengyihao's avatar
dengyihao 已提交
144 145
      .requestId = pRequest->requestId,
      .acctId = pTscObj->acctId,
X
Xiaoyu Wang 已提交
146
      .db = pRequest->pDb,
X
Xiaoyu Wang 已提交
147
      .topicQuery = topicQuery,
dengyihao's avatar
dengyihao 已提交
148 149 150 151 152
      .pSql = pRequest->sqlstr,
      .sqlLen = pRequest->sqlLen,
      .pMsg = pRequest->msgBuf,
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
      .pTransporter = pTscObj->pAppInfo->pTransporter,
X
Xiaoyu Wang 已提交
153
  };
H
Haojun Liao 已提交
154

H
Haojun Liao 已提交
155 156
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
157 158 159 160 161
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
162 163 164 165 166 167
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
    }
    TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
X
Xiaoyu Wang 已提交
168
  }
169

X
Xiaoyu Wang 已提交
170 171
  return code;
}
H
Haojun Liao 已提交
172

X
Xiaoyu Wang 已提交
173
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
174 175 176
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
X
Xiaoyu Wang 已提交
177
  pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
178 179 180

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
181 182 183 184 185 186 187 188

  if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
    SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
    if (pShowReqInfo->pArray == NULL) {
      pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
      pShowReqInfo->pArray = pMsgInfo->pExtension;
    }
  }
189 190
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
191

192
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
193 194
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
195

196 197
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
X
Xiaoyu Wang 已提交
198 199 200 201
  SPlanContext cxt = {
    .queryId = pRequest->requestId,
    .acctId = pRequest->pTscObj->acctId,
    .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
D
dapan1121 已提交
202 203
    .pAstRoot = pQuery->pRoot,
    .showRewrite = pQuery->showRewrite
X
Xiaoyu Wang 已提交
204
  };
205
  int32_t  code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
206 207 208 209
  if (code != 0) {
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
210 211
}

212 213
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
214

215
  pResInfo->numOfCols = numOfCols;
wafwerar's avatar
wafwerar 已提交
216
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(pSchema[0]));
217 218

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
219
    pResInfo->fields[i].bytes = pSchema[i].bytes;
dengyihao's avatar
dengyihao 已提交
220
    pResInfo->fields[i].type = pSchema[i].type;
221
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
222
  }
X
Xiaoyu Wang 已提交
223 224
}

X
Xiaoyu Wang 已提交
225
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
226
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
227

D
dapan1121 已提交
228
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
D
dapan1121 已提交
229
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
D
dapan1121 已提交
230
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
231 232
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
233 234
    }

D
dapan1121 已提交
235
    pRequest->code = code;
D
dapan1121 已提交
236
    terrno = code;
H
Haojun Liao 已提交
237
    return pRequest->code;
X
Xiaoyu Wang 已提交
238
  }
239

240
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
241
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
242

D
dapan1121 已提交
243 244
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
245 246
    }
  }
D
dapan1121 已提交
247

D
dapan1121 已提交
248
  pRequest->code = res.code;
D
dapan1121 已提交
249
  terrno = res.code;  
D
dapan1121 已提交
250
  return pRequest->code;
X
Xiaoyu Wang 已提交
251
}
X
Xiaoyu Wang 已提交
252

D
dapan1121 已提交
253
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
S
Shengliang Guan 已提交
254
  SRequestObj* pRequest = NULL;
X
Xiaoyu Wang 已提交
255
  SQuery* pQuery = NULL;
D
dapan1121 已提交
256
  int32_t code = 0;
X
Xiaoyu Wang 已提交
257
  SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
258 259

  CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
X
Xiaoyu Wang 已提交
260
  CHECK_CODE_GOTO(parseSql(pRequest, false, &pQuery), _return);
H
Haojun Liao 已提交
261

262
  if (pQuery->directRpc) {
X
Xiaoyu Wang 已提交
263
    CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return);
H
Haojun Liao 已提交
264
  } else {
X
Xiaoyu Wang 已提交
265
    CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList), _return);
266
    CHECK_CODE_GOTO(scheduleQuery(pRequest, pRequest->body.pDag, pNodeList), _return);
X
Xiaoyu Wang 已提交
267 268 269
  }

_return:
270
  taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
271
  qDestroyQuery(pQuery);
D
dapan1121 已提交
272
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
273 274
    pRequest->code = terrno;
  }
275

276 277 278
  return pRequest;
}

D
dapan1121 已提交
279 280 281 282 283 284 285 286 287
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
  SCatalog *pCatalog = NULL;
  int32_t code = 0;
  int32_t dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t tblNum = taosArrayGetSize(pRequest->tableList);

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
D
dapan1121 已提交
288
  
D
dapan1121 已提交
289 290 291 292 293 294 295 296 297 298 299 300 301
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
    char *dbFName = taosArrayGet(pRequest->dbList, i);
    
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
302 303 304
    }
  }

D
dapan1121 已提交
305 306 307 308 309 310 311
  for (int32_t i = 0; i < tblNum; ++i) {
    SName *tableName = taosArrayGet(pRequest->tableList, i);

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
312 313
  }

D
dapan1121 已提交
314
  return code;
D
dapan1121 已提交
315 316 317 318 319
}


SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
D
dapan1121 已提交
320
  int32_t retryNum = 0;
D
dapan1121 已提交
321 322
  int32_t code = 0;

D
dapan1121 已提交
323
  while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
D
dapan1121 已提交
324
    pRequest = execQueryImpl(pTscObj, sql, sqlLen);
D
dapan1121 已提交
325
    if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
326 327 328
      break;
    }

D
dapan1121 已提交
329 330 331
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
332 333
      break;
    }
D
dapan1121 已提交
334 335

    destroyRequest(pRequest);
D
dapan1121 已提交
336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351
  }
  
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
352 353
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
354

H
Haojun Liao 已提交
355
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
356
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
357
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
358
  mgmtEpSet->inUse = 0;
359

S
Shengliang Guan 已提交
360 361 362 363
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
364
    }
S
Shengliang Guan 已提交
365 366 367 368 369 370 371 372 373

    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
374
    }
S
Shengliang Guan 已提交
375 376 377

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
378 379 380 381 382 383 384 385 386 387
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
388 389 390
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                         SAppInstInfo* pAppInfo) {
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
391 392 393 394 395
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
396
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
397 398 399
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
400 401 402
    return NULL;
  }

403
  SMsgSendInfo* body = buildConnectMsg(pRequest);
404 405

  int64_t transporterId = 0;
H
Haojun Liao 已提交
406
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
407 408 409

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
410 411
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
412 413 414 415 416 417
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
dengyihao's avatar
dengyihao 已提交
418 419
    tscDebug("0x%" PRIx64 " connection is opening, connId:%d, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
420 421 422 423 424 425
    destroyRequest(pRequest);
  }

  return pTscObj;
}

dengyihao's avatar
dengyihao 已提交
426
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
427
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
428 429 430 431 432
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
433
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
434

435
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
436 437 438
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
439

S
Shengliang Guan 已提交
440 441
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
442

H
Haojun Liao 已提交
443
  char* db = getDbOfConnection(pObj);
444
  if (db != NULL) {
S
Shengliang Guan 已提交
445
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
446
  }
wafwerar's avatar
wafwerar 已提交
447
  taosMemoryFreeClear(db);
448

S
Shengliang Guan 已提交
449 450 451 452 453
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
454
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
455
  tSerializeSConnectReq(pReq, contLen, &connectReq);
456

S
Shengliang Guan 已提交
457 458
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
459
  return pMsgSendInfo;
460 461
}

462
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
463
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
464 465
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
466
}
dengyihao's avatar
dengyihao 已提交
467
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
D
dapan1121 已提交
468
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
469
}
470
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
471
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
472
  assert(pMsg->ahandle != NULL);
473

474
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
475
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
476
    assert(pRequest->self == pSendInfo->requestObjRefId);
477

478
    pRequest->metric.rsp = taosGetTimestampMs();
479

dengyihao's avatar
dengyihao 已提交
480
    STscObj* pTscObj = pRequest->pTscObj;
481 482 483 484
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
485 486
    }

487
    /*
dengyihao's avatar
dengyihao 已提交
488 489
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
490
     */
491
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
492
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
493 494
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
495
    } else {
dengyihao's avatar
dengyihao 已提交
496 497
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
498
    }
499

H
Haojun Liao 已提交
500
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
501 502
  }

dengyihao's avatar
dengyihao 已提交
503
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
504 505

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
506
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
507 508 509 510 511 512
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
513 514
  }

515
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
516
  rpcFreeCont(pMsg->pCont);
517
  destroySendMsgInfo(pSendInfo);
518
}
519

dengyihao's avatar
dengyihao 已提交
520
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
521 522 523 524 525 526 527 528 529 530 531 532 533
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

dengyihao's avatar
dengyihao 已提交
534 535 536
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
537
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
538 539
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
540

dengyihao's avatar
dengyihao 已提交
541
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
542 543
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
544
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
545
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
546 547
}

548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574
static void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
      if (pCol->offset[pResultInfo->current] != -1) {
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
      } else {
        pResultInfo->row[i] = NULL;
      }
    }
  }
}

void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
575
  assert(pRequest != NULL);
576
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
577

H
Haojun Liao 已提交
578 579
  SEpSet epSet = {0};

H
Haojun Liao 已提交
580
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
581
    if (pRequest->type == TDMT_VND_QUERY) {
582 583
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
584
        pResultInfo->numOfRows = 0;
585 586 587
        return NULL;
      }

588
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
589 590
      pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
      if (pRequest->code != TSDB_CODE_SUCCESS) {
591
        pResultInfo->numOfRows = 0;
592 593 594 595 596
        return NULL;
      }

      pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
      if (pRequest->code != TSDB_CODE_SUCCESS) {
597
        pResultInfo->numOfRows = 0;
598 599
        return NULL;
      }
H
Haojun Liao 已提交
600

dengyihao's avatar
dengyihao 已提交
601 602
      tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
               pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
603

604
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
605 606 607 608
        return NULL;
      }

      goto _return;
609
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
610
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
611
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
612 613
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
614
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
dengyihao's avatar
dengyihao 已提交
615
      SVgroupInfo*  pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
616

L
Liu Jicong 已提交
617
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
618 619 620 621 622 623 624 625
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

dengyihao's avatar
dengyihao 已提交
626
      SVgroupInfo*     pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
wafwerar's avatar
wafwerar 已提交
627
      SVShowTablesReq* pShowReq = taosMemoryCalloc(1, sizeof(SVShowTablesReq));
H
Haojun Liao 已提交
628 629 630 631 632 633
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
634
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
635

H
Haojun Liao 已提交
636
      int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
637
      STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
638
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
639 640 641
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
dengyihao's avatar
dengyihao 已提交
642
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
D
dapan1121 已提交
643
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
dengyihao's avatar
dengyihao 已提交
644

D
dapan1121 已提交
645 646 647
      if (pResultInfo->completed) {
        return NULL;
      }
H
Haojun Liao 已提交
648
    }
H
Haojun Liao 已提交
649

650
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
651

652
    int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
653
    STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
654
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
655 656 657 658 659 660 661 662 663

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
664
_return:
665 666 667
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
668 669 670 671 672
  }

  return pResultInfo->row;
}

673
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
674
  if (pResInfo->row == NULL) {
wafwerar's avatar
wafwerar 已提交
675 676 677 678
    pResInfo->row    = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol   = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
679

680 681 682
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
683
  }
684 685

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
686 687
}

688
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
H
Haojun Liao 已提交
689 690
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
691
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
692 693
  }

694 695 696 697
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
698

699 700
  int32_t* colLength = (int32_t*)pResultInfo->pData;
  char*    pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
H
Haojun Liao 已提交
701
  for (int32_t i = 0; i < numOfCols; ++i) {
702 703 704 705 706 707 708 709 710 711 712
    colLength[i] = htonl(colLength[i]);

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
713
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
714 715 716
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
717
  }
718

719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (type == TSDB_DATA_TYPE_NCHAR) {
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }
      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);

          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
    }
  }

748
  return TSDB_CODE_SUCCESS;
S
Shengliang Guan 已提交
749 750
}

H
Haojun Liao 已提交
751
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
752
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
753
  taosThreadMutexLock(&pObj->mutex);
754 755 756 757
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
758

wafwerar's avatar
wafwerar 已提交
759
  taosThreadMutexUnlock(&pObj->mutex);
760 761 762 763 764
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
765
  taosThreadMutexLock(&pTscObj->mutex);
766
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
767
  taosThreadMutexUnlock(&pTscObj->mutex);
768
}
S
Shengliang Guan 已提交
769

770
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp) {
771 772
  assert(pResultInfo != NULL && pRsp != NULL);

773 774 775 776 777 778
  pResultInfo->pRspMsg    = (const char*)pRsp;
  pResultInfo->pData      = (void*)pRsp->data;
  pResultInfo->numOfRows  = htonl(pRsp->numOfRows);
  pResultInfo->current    = 0;
  pResultInfo->completed  = (pRsp->completed == 1);
  pResultInfo->payloadLen = htonl(pRsp->compLen);
H
Haojun Liao 已提交
779

780
  // TODO handle the compressed case
781
  pResultInfo->totalRows += pResultInfo->numOfRows;
782
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows);
L
fix  
Liu Jicong 已提交
783
}