clientImpl.c 32.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include "clientInt.h"
17
#include "clientLog.h"
18
#include "command.h"
19
#include "scheduler.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tmsgtype.h"
H
Haojun Liao 已提交
24
#include "tpagedbuf.h"
25
#include "tref.h"
X
Xiaoyu Wang 已提交
26

S
Shengliang Guan 已提交
27
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
L
Liu Jicong 已提交
28
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType);
dengyihao's avatar
dengyihao 已提交
29
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
30

31
static bool stringLengthCheck(const char* str, size_t maxsize) {
32 33 34 35 36 37 38 39 40 41 42 43
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
44
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
45

dengyihao's avatar
dengyihao 已提交
46
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
47

dengyihao's avatar
dengyihao 已提交
48
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
49

50 51 52 53 54 55
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
56
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
57
                                SAppInstInfo* pAppInfo, int connType);
58

dengyihao's avatar
dengyihao 已提交
59
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
L
Liu Jicong 已提交
60
                            uint16_t port, int connType) {
61 62 63 64
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

65 66 67 68 69
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

70
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
71
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
72
    if (!validateDbName(db)) {
73 74 75 76
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

77 78
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
79 80
  }

81
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
82 83 84 85 86 87
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
88
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
89 90 91 92
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

93
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101 102
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
103

104 105
  if (port) {
    epSet.epSet.eps[0].port = port;
S
Shengliang Guan 已提交
106
    epSet.epSet.eps[1].port = port;
107 108
  }

109
  char* key = getClusterKey(user, secretEncrypt, ip, port);
110

111
  SAppInstInfo** pInst = NULL;
wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
113 114

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
115
  SAppInstInfo* p = NULL;
116
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
117
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
118
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
120
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

wafwerar's avatar
wafwerar 已提交
126
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
127

wafwerar's avatar
wafwerar 已提交
128
  taosMemoryFreeClear(key);
L
Liu Jicong 已提交
129
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
130 131
}

dengyihao's avatar
dengyihao 已提交
132
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
133 134
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
135
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
136
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
137 138
  }

wafwerar's avatar
wafwerar 已提交
139
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
140
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
141
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
142 143
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
144 145
  }

X
Xiaoyu Wang 已提交
146 147 148
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
149

dengyihao's avatar
dengyihao 已提交
150 151
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
                  sizeof((*pRequest)->self))) {
D
dapan1121 已提交
152 153 154 155 156 157
    destroyRequest(*pRequest);
    *pRequest = NULL;
    tscError("put request to request hash failed");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
158
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
159 160
  return TSDB_CODE_SUCCESS;
}
161

D
stmt  
dapan1121 已提交
162
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
163 164
  STscObj* pTscObj = pRequest->pTscObj;

165 166 167 168 169 170 171 172 173 174 175
  SParseContext cxt = {.requestId = pRequest->requestId,
                       .acctId = pTscObj->acctId,
                       .db = pRequest->pDb,
                       .topicQuery = topicQuery,
                       .pSql = pRequest->sqlstr,
                       .sqlLen = pRequest->sqlLen,
                       .pMsg = pRequest->msgBuf,
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
                       .pStmtCb = pStmtCb,
                       .pUser = pTscObj->user};
H
Haojun Liao 已提交
176

H
Haojun Liao 已提交
177 178
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
179 180 181 182 183
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
184 185 186
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
187
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
X
Xiaoyu Wang 已提交
188
    }
H
Haojun Liao 已提交
189

wafwerar's avatar
wafwerar 已提交
190 191
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
X
Xiaoyu Wang 已提交
192
  }
193

X
Xiaoyu Wang 已提交
194 195
  return code;
}
H
Haojun Liao 已提交
196

197 198
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
L
Liu Jicong 已提交
199
  int32_t            code = qExecCommand(pQuery->pRoot, &pRsp);
200
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
201
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
202 203 204 205
  }
  return code;
}

X
Xiaoyu Wang 已提交
206
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
207 208 209 210 211
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

212 213 214
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
215
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
216 217 218

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
219

220 221
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
222

223
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
224 225
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
226

227 228
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
L
Liu Jicong 已提交
229 230 231 232 233 234
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
                      .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite,
                      .pMsg = pRequest->msgBuf,
D
dapan1121 已提交
235 236
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                      .placeholderNum = pQuery->placeholderNum};
237 238 239 240 241 242
  SEpSet       mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
  SCatalog*    pCatalog = NULL;
  int32_t      code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
  if (TSDB_CODE_SUCCESS == code) {
    code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
  }
X
Xiaoyu Wang 已提交
243 244
  if (TSDB_CODE_SUCCESS == code) {
    code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
245 246
  }
  return code;
X
Xiaoyu Wang 已提交
247 248
}

249
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
250
  ASSERT(pSchema != NULL && numOfCols > 0);
251

252
  pResInfo->numOfCols = numOfCols;
L
Liu Jicong 已提交
253 254 255 256 257 258
  if (pResInfo->fields != NULL) {
    taosMemoryFree(pResInfo->fields);
  }
  if (pResInfo->userFields != NULL) {
    taosMemoryFree(pResInfo->userFields);
  }
259 260
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
261 262

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
263
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
264
    pResInfo->fields[i].type = pSchema[i].type;
265 266

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
267
    pResInfo->userFields[i].type = pSchema[i].type;
268 269 270 271 272 273 274

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

275
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
276
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
277
  }
X
Xiaoyu Wang 已提交
278 279
}

280
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
dengyihao's avatar
dengyihao 已提交
281 282
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
      precision != TSDB_TIME_PRECISION_NANO) {
283 284 285 286 287 288
    return;
  }

  pResInfo->precision = precision;
}

X
Xiaoyu Wang 已提交
289
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
290
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
291

D
dapan1121 已提交
292
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
L
Liu Jicong 已提交
293
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
L
Liu Jicong 已提交
294
                                       pRequest->metric.start, &res);
D
dapan1121 已提交
295
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
296 297
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
298 299
    }

D
dapan1121 已提交
300
    pRequest->code = code;
D
dapan1121 已提交
301
    terrno = code;
H
Haojun Liao 已提交
302
    return pRequest->code;
X
Xiaoyu Wang 已提交
303
  }
304

305
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
306
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
307

D
dapan1121 已提交
308 309
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
310 311
    }
  }
D
dapan1121 已提交
312

D
dapan1121 已提交
313
  pRequest->code = res.code;
L
Liu Jicong 已提交
314
  terrno = res.code;
D
dapan1121 已提交
315
  return pRequest->code;
X
Xiaoyu Wang 已提交
316
}
X
Xiaoyu Wang 已提交
317

D
dapan1121 已提交
318 319 320 321 322
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
  *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
  return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
}

D
stmt  
dapan1121 已提交
323
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery) {
X
Xiaoyu Wang 已提交
324 325 326 327 328 329 330 331
  if (TSDB_CODE_SUCCESS == code) {
    switch (pQuery->execMode) {
      case QUERY_EXEC_MODE_LOCAL:
        code = execLocalCmd(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_RPC:
        code = execDdlQuery(pRequest, pQuery);
        break;
X
Xiaoyu Wang 已提交
332 333
      case QUERY_EXEC_MODE_SCHEDULE: {
        SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
334 335 336 337
        code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
        if (TSDB_CODE_SUCCESS == code) {
          code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
        }
X
Xiaoyu Wang 已提交
338
        taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
339
        break;
X
Xiaoyu Wang 已提交
340
      }
X
Xiaoyu Wang 已提交
341 342 343 344 345 346
      case QUERY_EXEC_MODE_EMPTY_RESULT:
        pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
347 348
  }

D
stmt  
dapan1121 已提交
349 350 351
  if (!keepQuery) {
    qDestroyQuery(pQuery);
  }
dengyihao's avatar
dengyihao 已提交
352

D
dapan1121 已提交
353
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
354 355
    pRequest->code = terrno;
  }
356

357 358 359
  return pRequest;
}

D
stmt  
dapan1121 已提交
360 361 362 363 364
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  SQuery*      pQuery = NULL;

  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
365 366 367 368 369 370 371
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  code = parseSql(pRequest, false, &pQuery, NULL);
  if (code != TSDB_CODE_SUCCESS) {
372 373
    pRequest->code = code;
    return pRequest;
D
stmt  
dapan1121 已提交
374 375
  }

D
stmt  
dapan1121 已提交
376
  return launchQueryImpl(pRequest, pQuery, code, false);
D
stmt  
dapan1121 已提交
377 378
}

D
dapan1121 已提交
379
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
380 381 382 383
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
384 385 386 387

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
388

D
dapan1121 已提交
389 390 391 392 393 394 395 396
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
397 398
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
399 400 401
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
402 403 404
    }
  }

D
dapan1121 已提交
405
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
406
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
407 408 409 410 411

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
412 413
  }

D
dapan1121 已提交
414
  return code;
D
dapan1121 已提交
415 416 417 418
}

SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
419 420
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
421

D
dapan1121 已提交
422
  while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
D
stmt  
dapan1121 已提交
423
    pRequest = launchQuery(pTscObj, sql, sqlLen);
424
    if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
425 426 427
      break;
    }

D
dapan1121 已提交
428 429 430
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
431 432
      break;
    }
D
dapan1121 已提交
433 434

    destroyRequest(pRequest);
D
dapan1121 已提交
435
  }
L
Liu Jicong 已提交
436

D
dapan1121 已提交
437 438 439 440 441 442 443 444 445 446 447 448 449 450
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
451 452
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
453

H
Haojun Liao 已提交
454
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
455
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
456
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
457
  mgmtEpSet->inUse = 0;
458

S
Shengliang Guan 已提交
459 460 461 462
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
463
    }
S
Shengliang Guan 已提交
464

465 466 467 468 469 470
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

S
Shengliang Guan 已提交
471 472 473 474 475 476 477
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
478
    }
S
Shengliang Guan 已提交
479 480 481

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
482 483 484 485 486 487 488 489 490 491
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
492
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
493
                         SAppInstInfo* pAppInfo, int connType) {
dengyihao's avatar
dengyihao 已提交
494
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
495 496 497 498 499
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
500
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
501 502 503
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
504 505 506
    return NULL;
  }

L
Liu Jicong 已提交
507
  SMsgSendInfo* body = buildConnectMsg(pRequest, connType);
508 509

  int64_t transporterId = 0;
H
Haojun Liao 已提交
510
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
511 512 513

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
514 515
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
516 517 518 519 520 521
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
522
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
523
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
524 525 526 527 528 529
    destroyRequest(pRequest);
  }

  return pTscObj;
}

L
Liu Jicong 已提交
530
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest, int8_t connType) {
wafwerar's avatar
wafwerar 已提交
531
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
532 533 534 535 536
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
537
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
538

539
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
540 541 542
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
543

S
Shengliang Guan 已提交
544 545
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
546

H
Haojun Liao 已提交
547
  char* db = getDbOfConnection(pObj);
548
  if (db != NULL) {
S
Shengliang Guan 已提交
549
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
550
  }
wafwerar's avatar
wafwerar 已提交
551
  taosMemoryFreeClear(db);
552

L
Liu Jicong 已提交
553
  connectReq.connType = connType;
S
Shengliang Guan 已提交
554 555 556
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
dengyihao's avatar
dengyihao 已提交
557 558
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
S
Shengliang Guan 已提交
559 560

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
561
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
562
  tSerializeSConnectReq(pReq, contLen, &connectReq);
563

S
Shengliang Guan 已提交
564 565
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
566
  return pMsgSendInfo;
567 568
}

569
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
570
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
571 572
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
573
}
574

dengyihao's avatar
dengyihao 已提交
575
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
L
Liu Jicong 已提交
576 577
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP ||
         msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
578
}
579

580
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
581
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
582
  assert(pMsg->ahandle != NULL);
583

584
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
585
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
586
    assert(pRequest->self == pSendInfo->requestObjRefId);
587

D
dapan1121 已提交
588
    pRequest->metric.rsp = taosGetTimestampUs();
589

dengyihao's avatar
dengyihao 已提交
590
    STscObj* pTscObj = pRequest->pTscObj;
591 592 593 594
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
595 596
    }

597
    /*
dengyihao's avatar
dengyihao 已提交
598 599
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
600
     */
601
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
602
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
603
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
604
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
605
    } else {
dengyihao's avatar
dengyihao 已提交
606
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
607
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
608
    }
609

H
Haojun Liao 已提交
610
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
611 612
  }

dengyihao's avatar
dengyihao 已提交
613
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
614 615

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
616
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
617 618 619 620 621 622
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
623 624
  }

625
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
626
  rpcFreeCont(pMsg->pCont);
627
  destroySendMsgInfo(pSendInfo);
628
}
629

dengyihao's avatar
dengyihao 已提交
630
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
631 632 633 634 635 636 637 638 639 640
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

L
Liu Jicong 已提交
641
  return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
642 643
}

dengyihao's avatar
dengyihao 已提交
644 645 646
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
647
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
648 649
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
650

dengyihao's avatar
dengyihao 已提交
651
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
652 653
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
654
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
655
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
656 657
}

L
Liu Jicong 已提交
658
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
659 660 661 662 663 664 665
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
666
      if (pCol->offset[pResultInfo->current] != -1) {
667 668 669 670 671 672
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
673
        pResultInfo->length[i] = 0;
674 675 676 677
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
678
        pResultInfo->length[i] = bytes;
679 680
      } else {
        pResultInfo->row[i] = NULL;
681
        pResultInfo->length[i] = 0;
682 683 684 685 686
      }
    }
  }
}

687
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
688
  assert(pRequest != NULL);
H
Haojun Liao 已提交
689

H
Haojun Liao 已提交
690
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
691
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
692
    // All data has returned to App already, no need to try again
693 694 695 696 697
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

H
Haojun Liao 已提交
698 699 700 701 702 703
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
704

H
Haojun Liao 已提交
705 706 707 708 709
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
710

H
Haojun Liao 已提交
711 712
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
H
Haojun Liao 已提交
713

H
Haojun Liao 已提交
714
    if (pResultInfo->numOfRows == 0) {
H
Haojun Liao 已提交
715 716 717 718
      return NULL;
    }
  }

719 720 721
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
722 723 724 725 726
  }

  return pResultInfo->row;
}

727
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
728
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
729 730
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
731 732
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
733

734 735 736
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
737
  }
738 739

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
740 741
}

742 743 744 745 746
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

747
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
748 749 750 751 752 753 754 755 756 757 758 759 760 761
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);
D
stmt  
dapan1121 已提交
762
          ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
dengyihao's avatar
dengyihao 已提交
763

764 765 766 767 768 769 770 771 772
          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
773 774 775 776 777 778 779 780

    if (type == TSDB_DATA_TYPE_JSON) {
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;
dengyihao's avatar
dengyihao 已提交
781
      int32_t        len = 0;
782 783 784 785 786 787
      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t jsonInnerType = *pStart;
dengyihao's avatar
dengyihao 已提交
788 789 790
          char*   jsonInnerData = pStart + CHAR_BYTES;
          char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
          if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
791 792
            sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
793 794 795
          } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
            int32_t length =
                taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
796 797

            if (length <= 0) {
dengyihao's avatar
dengyihao 已提交
798 799
              tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                       varDataVal(jsonInnerData));
800 801 802
              length = 0;
            }
            varDataSetLen(dst, length);
dengyihao's avatar
dengyihao 已提交
803
          } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
804
            *(char*)varDataVal(dst) = '\"';
dengyihao's avatar
dengyihao 已提交
805 806
            int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
                                           varDataVal(dst) + CHAR_BYTES);
807
            if (length <= 0) {
dengyihao's avatar
dengyihao 已提交
808 809
              tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                       varDataVal(jsonInnerData));
810 811
              length = 0;
            }
dengyihao's avatar
dengyihao 已提交
812
            varDataSetLen(dst, length + CHAR_BYTES * 2);
813
            *(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"';
dengyihao's avatar
dengyihao 已提交
814
          } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
815 816 817
            double jsonVd = *(double*)(jsonInnerData);
            sprintf(varDataVal(dst), "%.9lf", jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
818
          } else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
819 820 821
            int64_t jsonVd = *(int64_t*)(jsonInnerData);
            sprintf(varDataVal(dst), "%" PRId64, jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
822 823
          } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
            sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
824
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
825
          } else {
826 827 828
            ASSERT(0);
          }

dengyihao's avatar
dengyihao 已提交
829
          if (len + varDataTLen(dst) > colLength[i]) {
830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846
            p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
            if (p == NULL) {
              return TSDB_CODE_OUT_OF_MEMORY;
            }

            pResultInfo->convertBuf[i] = p;
          }
          p = pResultInfo->convertBuf[i] + len;
          memcpy(p, dst, varDataTLen(dst));
          pCol->offset[j] = len;
          len += varDataTLen(dst);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
847 848 849 850 851
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
852 853
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
854 855
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
856
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
857 858
  }

859 860 861 862
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
863

L
Liu Jicong 已提交
864
  char* p = (char*)pResultInfo->pData;
865

L
Liu Jicong 已提交
866
  int32_t dataLen = *(int32_t*)p;
867 868
  p += sizeof(int32_t);

L
Liu Jicong 已提交
869
  uint64_t groupId = *(uint64_t*)p;
870 871 872 873 874 875
  p += sizeof(uint64_t);

  int32_t* colLength = (int32_t*)p;
  p += sizeof(int32_t) * numOfCols;

  char* pStart = p;
H
Haojun Liao 已提交
876
  for (int32_t i = 0; i < numOfCols; ++i) {
877
    colLength[i] = htonl(colLength[i]);
878
    ASSERT(colLength[i] < dataLen);
879 880 881 882 883 884 885 886 887 888

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
889
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
890 891 892
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
893
  }
894

H
Haojun Liao 已提交
895
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
896 897
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
898 899
  }

900
  return code;
S
Shengliang Guan 已提交
901 902
}

H
Haojun Liao 已提交
903
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
904
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
905
  taosThreadMutexLock(&pObj->mutex);
906 907 908 909
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
910

wafwerar's avatar
wafwerar 已提交
911
  taosThreadMutexUnlock(&pObj->mutex);
912 913 914 915 916
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
917
  taosThreadMutexLock(&pTscObj->mutex);
918
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
919
  taosThreadMutexUnlock(&pTscObj->mutex);
920
}
S
Shengliang Guan 已提交
921

H
Haojun Liao 已提交
922 923 924 925 926 927 928 929 930 931
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

932
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
933 934
  assert(pResultInfo != NULL && pRsp != NULL);

L
Liu Jicong 已提交
935 936 937 938 939
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
940
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
941
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
942

943
  // TODO handle the compressed case
944
  pResultInfo->totalRows += pResultInfo->numOfRows;
L
Liu Jicong 已提交
945 946
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
947
}
S
Shengliang Guan 已提交
948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976

TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
  void*              clientRpc = NULL;
  SServerStatusRsp   statusRsp = {0};
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
  SRpcMsg            rpcMsg = {.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
  SRpcMsg            rpcRsp = {0};
  SRpcInit           rpcInit = {0};
  char               pass[TSDB_PASSWORD_LEN + 1] = {0};

  taosEncryptPass_c((uint8_t*)("_pwd"), strlen("_pwd"), pass);
  rpcInit.label = "CHK";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "_dnd";
  rpcInit.ckey = "_key";
  rpcInit.spi = 1;
  rpcInit.secret = pass;

  clientRpc = rpcOpen(&rpcInit);
  if (clientRpc == NULL) {
    tscError("failed to init server status client");
    goto _OVER;
  }

S
Shengliang Guan 已提交
977 978 979 980 981 982 983 984
  if (fqdn == NULL) {
    fqdn = tsLocalFqdn;
  }

  if (port == 0) {
    port = tsServerPort;
  }

S
Shengliang Guan 已提交
985 986 987 988 989 990 991 992 993 994 995 996 997 998 999
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
  epSet.eps[0].port = (uint16_t)port;
  rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
    tscError("failed to send server status req since %s", terrstr());
    goto _OVER;
  }

  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
    tscError("failed to parse server status rsp since %s", terrstr());
    goto _OVER;
  }

  code = statusRsp.statusCode;
S
Shengliang Guan 已提交
1000
  if (details != NULL && statusRsp.details != NULL) {
S
Shengliang Guan 已提交
1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012
    tstrncpy(details, statusRsp.details, maxlen);
  }

_OVER:
  if (clientRpc != NULL) {
    rpcClose(clientRpc);
  }
  if (rpcRsp.pCont != NULL) {
    rpcFreeCont(rpcRsp.pCont);
  }
  return code;
}