clientImpl.c 34.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include "clientInt.h"
17
#include "clientLog.h"
18
#include "command.h"
19
#include "scheduler.h"
H
Haojun Liao 已提交
20
#include "tdatablock.h"
21
#include "tdef.h"
22
#include "tglobal.h"
23
#include "tmsgtype.h"
H
Haojun Liao 已提交
24
#include "tpagedbuf.h"
25
#include "tref.h"
X
Xiaoyu Wang 已提交
26

S
Shengliang Guan 已提交
27
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
28
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
dengyihao's avatar
dengyihao 已提交
29
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
30

31
static bool stringLengthCheck(const char* str, size_t maxsize) {
32 33 34 35 36 37 38 39 40 41 42 43
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
44
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
45

dengyihao's avatar
dengyihao 已提交
46
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
47

dengyihao's avatar
dengyihao 已提交
48
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
49

50 51 52 53 54 55
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
56
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
57
                                SAppInstInfo* pAppInfo, int connType);
58

dengyihao's avatar
dengyihao 已提交
59
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
L
Liu Jicong 已提交
60
                            uint16_t port, int connType) {
61 62 63 64
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

65 66 67 68 69
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

70
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
71
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
72
    if (!validateDbName(db)) {
73 74 75 76
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

77 78
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
79 80
  }

81
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
82 83 84 85 86 87
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
88
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
89 90 91 92
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

93
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
94 95 96 97 98 99 100 101 102
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
103

104 105
  if (port) {
    epSet.epSet.eps[0].port = port;
S
Shengliang Guan 已提交
106
    epSet.epSet.eps[1].port = port;
107 108
  }

109
  char* key = getClusterKey(user, secretEncrypt, ip, port);
110

111
  SAppInstInfo** pInst = NULL;
wafwerar's avatar
wafwerar 已提交
112
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
113 114

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
115
  SAppInstInfo* p = NULL;
116
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
117
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
118
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
119
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
120
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
121
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
122

H
Haojun Liao 已提交
123
    pInst = &p;
124 125
  }

wafwerar's avatar
wafwerar 已提交
126
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
127

wafwerar's avatar
wafwerar 已提交
128
  taosMemoryFreeClear(key);
L
Liu Jicong 已提交
129
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
130 131
}

dengyihao's avatar
dengyihao 已提交
132
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
133 134
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
135
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
136
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
137 138
  }

wafwerar's avatar
wafwerar 已提交
139
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
140
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
141
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
142 143
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
144 145
  }

X
Xiaoyu Wang 已提交
146 147 148
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
149

dengyihao's avatar
dengyihao 已提交
150 151
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
                  sizeof((*pRequest)->self))) {
D
dapan1121 已提交
152 153 154 155 156 157
    destroyRequest(*pRequest);
    *pRequest = NULL;
    tscError("put request to request hash failed");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
158
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
159 160
  return TSDB_CODE_SUCCESS;
}
161

D
stmt  
dapan1121 已提交
162
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
163 164
  STscObj* pTscObj = pRequest->pTscObj;

165 166 167 168 169 170 171 172 173 174
  SParseContext cxt = {.requestId = pRequest->requestId,
                       .acctId = pTscObj->acctId,
                       .db = pRequest->pDb,
                       .topicQuery = topicQuery,
                       .pSql = pRequest->sqlstr,
                       .sqlLen = pRequest->sqlLen,
                       .pMsg = pRequest->msgBuf,
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
                       .pStmtCb = pStmtCb,
175 176
                       .pUser = pTscObj->user,
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
H
Haojun Liao 已提交
177

H
Haojun Liao 已提交
178 179
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
180 181 182 183
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
184
  code = qParseSql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
185 186 187
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
188
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
X
Xiaoyu Wang 已提交
189
    }
H
Haojun Liao 已提交
190

wafwerar's avatar
wafwerar 已提交
191 192
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
X
Xiaoyu Wang 已提交
193
  }
194

X
Xiaoyu Wang 已提交
195 196
  return code;
}
H
Haojun Liao 已提交
197

198 199
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
L
Liu Jicong 已提交
200
  int32_t            code = qExecCommand(pQuery->pRoot, &pRsp);
201
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
202
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
203 204 205 206
  }
  return code;
}

X
Xiaoyu Wang 已提交
207
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
208 209 210 211 212
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

213 214 215
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
216
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
217 218 219

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
220

221 222
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
223

224
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
225 226
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
227

228 229
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
L
Liu Jicong 已提交
230 231 232 233 234 235
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
                      .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite,
                      .pMsg = pRequest->msgBuf,
236
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
237 238 239 240 241 242
  SEpSet       mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
  SCatalog*    pCatalog = NULL;
  int32_t      code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
  if (TSDB_CODE_SUCCESS == code) {
    code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
  }
X
Xiaoyu Wang 已提交
243 244
  if (TSDB_CODE_SUCCESS == code) {
    code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
245 246
  }
  return code;
X
Xiaoyu Wang 已提交
247 248
}

249
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
250
  ASSERT(pSchema != NULL && numOfCols > 0);
251

252
  pResInfo->numOfCols = numOfCols;
L
Liu Jicong 已提交
253 254 255 256 257 258
  if (pResInfo->fields != NULL) {
    taosMemoryFree(pResInfo->fields);
  }
  if (pResInfo->userFields != NULL) {
    taosMemoryFree(pResInfo->userFields);
  }
259 260
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
261 262

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
263
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
264
    pResInfo->fields[i].type = pSchema[i].type;
265 266

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
267
    pResInfo->userFields[i].type = pSchema[i].type;
268 269 270 271 272 273 274

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

275
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
276
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
277
  }
X
Xiaoyu Wang 已提交
278 279
}

280
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
dengyihao's avatar
dengyihao 已提交
281 282
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
      precision != TSDB_TIME_PRECISION_NANO) {
283 284 285 286 287 288
    return;
  }

  pResInfo->precision = precision;
}

D
dapan 已提交
289
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
290
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
291

D
dapan1121 已提交
292
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
L
Liu Jicong 已提交
293
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
dengyihao's avatar
dengyihao 已提交
294
                                  pRequest->metric.start, &res);
D
dapan1121 已提交
295
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
296 297
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
298 299
    }

D
dapan1121 已提交
300
    pRequest->code = code;
D
dapan1121 已提交
301
    terrno = code;
H
Haojun Liao 已提交
302
    return pRequest->code;
X
Xiaoyu Wang 已提交
303
  }
304

305
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
306
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
307

D
dapan1121 已提交
308 309
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
310 311
    }
  }
D
dapan1121 已提交
312

D
dapan1121 已提交
313
  *pRes = res.res;
D
dapan 已提交
314

D
dapan1121 已提交
315
  pRequest->code = res.code;
L
Liu Jicong 已提交
316
  terrno = res.code;
D
dapan1121 已提交
317
  return pRequest->code;
X
Xiaoyu Wang 已提交
318
}
X
Xiaoyu Wang 已提交
319

D
dapan1121 已提交
320 321 322 323 324
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
  *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
  return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
}

D
dapan1121 已提交
325 326 327
int32_t validateSversion(SRequestObj* pRequest, void* res) {
  SArray* pArray = NULL;
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
328

D
dapan1121 已提交
329 330 331 332 333 334 335 336 337 338 339
  if (TDMT_VND_SUBMIT == pRequest->type) {
    SSubmitRsp* pRsp = (SSubmitRsp*)res;
    if (pRsp->nBlocks <= 0) {
      return TSDB_CODE_SUCCESS;
    }

    pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
    if (NULL == pArray) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_OUT_OF_MEMORY;
    }
dengyihao's avatar
dengyihao 已提交
340

D
dapan1121 已提交
341
    for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
dengyihao's avatar
dengyihao 已提交
342 343
      SSubmitBlkRsp* blk = pRsp->pBlocks + i;
      STbSVersion    tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
D
dapan1121 已提交
344 345 346
      taosArrayPush(pArray, &tbSver);
    }
  } else if (TDMT_VND_QUERY == pRequest->type) {
347 348 349 350 351 352 353 354 355 356 357
    SArray* pTbArray = (SArray*)res;
    int32_t tbNum = taosArrayGetSize(pTbArray);
    if (tbNum <= 0) {
      return TSDB_CODE_SUCCESS;
    }

    pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
    if (NULL == pArray) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
358

359 360 361 362 363
    for (int32_t i = 0; i < tbNum; ++i) {
      STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
      STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion};
      taosArrayPush(pArray, &tbSver);
    }
D
dapan1121 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
  }

  SCatalog* pCatalog = NULL;
  CHECK_CODE_GOTO(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog), _return);

  SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);

  code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &epset, pArray);

_return:

  taosArrayDestroy(pArray);

  return code;
}

D
dapan1121 已提交
380 381 382 383
void freeRequestRes(SRequestObj* pRequest, void* res) {
  if (NULL == res) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
384

D
dapan1121 已提交
385 386 387
  if (TDMT_VND_SUBMIT == pRequest->type) {
    tFreeSSubmitRsp((SSubmitRsp*)res);
  } else if (TDMT_VND_QUERY == pRequest->type) {
388
    taosArrayDestroy((SArray *)res);
D
dapan1121 已提交
389 390
  }
}
D
dapan1121 已提交
391

D
dapan 已提交
392
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
D
dapan1121 已提交
393 394
  void* pRes = NULL;

X
Xiaoyu Wang 已提交
395 396 397 398 399 400 401 402
  if (TSDB_CODE_SUCCESS == code) {
    switch (pQuery->execMode) {
      case QUERY_EXEC_MODE_LOCAL:
        code = execLocalCmd(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_RPC:
        code = execDdlQuery(pRequest, pQuery);
        break;
X
Xiaoyu Wang 已提交
403 404
      case QUERY_EXEC_MODE_SCHEDULE: {
        SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
405 406
        code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
        if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
407 408 409 410
          code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes);
          if (NULL != pRes) {
            code = validateSversion(pRequest, pRes);
          }
X
Xiaoyu Wang 已提交
411
        }
X
Xiaoyu Wang 已提交
412
        taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
413
        break;
X
Xiaoyu Wang 已提交
414
      }
X
Xiaoyu Wang 已提交
415 416 417 418 419 420
      case QUERY_EXEC_MODE_EMPTY_RESULT:
        pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
421 422
  }

D
stmt  
dapan1121 已提交
423 424 425
  if (!keepQuery) {
    qDestroyQuery(pQuery);
  }
dengyihao's avatar
dengyihao 已提交
426

D
dapan1121 已提交
427
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
428
    pRequest->code = terrno;
D
dapan1121 已提交
429 430 431 432 433 434
    freeRequestRes(pRequest, pRes);
    pRes = NULL;
  }

  if (res) {
    *res = pRes;
X
Xiaoyu Wang 已提交
435
  }
436

437 438 439
  return pRequest;
}

D
stmt  
dapan1121 已提交
440 441 442 443 444
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  SQuery*      pQuery = NULL;

  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
445 446 447 448 449 450 451
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  code = parseSql(pRequest, false, &pQuery, NULL);
  if (code != TSDB_CODE_SUCCESS) {
452 453
    pRequest->code = code;
    return pRequest;
D
stmt  
dapan1121 已提交
454 455
  }

D
dapan 已提交
456
  return launchQueryImpl(pRequest, pQuery, code, false, NULL);
D
stmt  
dapan1121 已提交
457 458
}

D
dapan1121 已提交
459
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
460 461 462 463
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
464 465 466 467

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
468

D
dapan1121 已提交
469 470 471 472 473 474 475 476
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
477 478
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
479 480 481
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
482 483 484
    }
  }

D
dapan1121 已提交
485
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
486
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
487 488 489 490 491

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
492 493
  }

D
dapan1121 已提交
494
  return code;
D
dapan1121 已提交
495 496 497 498
}

SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
499 500
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
501

D
dapan1121 已提交
502
  while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
D
stmt  
dapan1121 已提交
503
    pRequest = launchQuery(pTscObj, sql, sqlLen);
504
    if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
505 506 507
      break;
    }

D
dapan1121 已提交
508 509 510
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
511 512
      break;
    }
D
dapan1121 已提交
513 514

    destroyRequest(pRequest);
D
dapan1121 已提交
515
  }
L
Liu Jicong 已提交
516

D
dapan1121 已提交
517 518 519 520 521 522 523 524 525 526 527 528 529 530
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
531 532
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
533

H
Haojun Liao 已提交
534
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
535
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
536
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
537
  mgmtEpSet->inUse = 0;
538

S
Shengliang Guan 已提交
539 540 541 542
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
543
    }
S
Shengliang Guan 已提交
544

545 546 547 548 549 550
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

S
Shengliang Guan 已提交
551 552 553 554 555 556 557
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
558
    }
S
Shengliang Guan 已提交
559 560 561

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
562 563 564 565 566 567 568 569 570 571
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
572
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
573
                         SAppInstInfo* pAppInfo, int connType) {
574
  STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
575 576 577 578 579
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
580
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
581 582 583
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
584 585 586
    return NULL;
  }

587
  SMsgSendInfo* body = buildConnectMsg(pRequest);
588 589

  int64_t transporterId = 0;
H
Haojun Liao 已提交
590
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
591 592 593

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
594 595
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
596
    fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
597

598
    terrno = pRequest->code;
599 600 601 602
    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
603
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
604
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
605 606 607 608 609 610
    destroyRequest(pRequest);
  }

  return pTscObj;
}

611
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
612
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
613 614 615 616 617
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
618
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
619

620
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
621 622 623
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
624

S
Shengliang Guan 已提交
625 626
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
627

H
Haojun Liao 已提交
628
  char* db = getDbOfConnection(pObj);
629
  if (db != NULL) {
S
Shengliang Guan 已提交
630
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
631
  }
wafwerar's avatar
wafwerar 已提交
632
  taosMemoryFreeClear(db);
633

X
Xiaoyu Wang 已提交
634 635
  connectReq.connType = pObj->connType;
  connectReq.pid = htonl(appInfo.pid);
S
Shengliang Guan 已提交
636
  connectReq.startTime = htobe64(appInfo.startTime);
637

S
Shengliang Guan 已提交
638
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
dengyihao's avatar
dengyihao 已提交
639 640
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
S
Shengliang Guan 已提交
641 642

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
643
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
644
  tSerializeSConnectReq(pReq, contLen, &connectReq);
645

S
Shengliang Guan 已提交
646 647
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
648
  return pMsgSendInfo;
649 650
}

651
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
652
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
653 654
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
655
}
656

dengyihao's avatar
dengyihao 已提交
657
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
L
Liu Jicong 已提交
658 659
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP ||
         msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
660
}
661

662
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
663 664
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
665

666
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
667
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
668
    assert(pRequest->self == pSendInfo->requestObjRefId);
669

D
dapan1121 已提交
670
    pRequest->metric.rsp = taosGetTimestampUs();
671

dengyihao's avatar
dengyihao 已提交
672
    STscObj* pTscObj = pRequest->pTscObj;
673 674 675 676
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
677 678
    }

679
    /*
dengyihao's avatar
dengyihao 已提交
680 681
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
682
     */
683
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
684
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
685
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
686
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
687
    } else {
dengyihao's avatar
dengyihao 已提交
688
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
689
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
690
    }
691

H
Haojun Liao 已提交
692
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
693 694
  }

S
Shengliang Guan 已提交
695
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
696 697

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
698
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
699 700 701 702 703 704
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
705 706
  }

707
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
708
  rpcFreeCont(pMsg->pCont);
709
  destroySendMsgInfo(pSendInfo);
710
}
711

dengyihao's avatar
dengyihao 已提交
712
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
713 714 715 716 717 718 719 720 721 722
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

L
Liu Jicong 已提交
723
  return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
724 725
}

dengyihao's avatar
dengyihao 已提交
726 727 728
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
729
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
730 731
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
732

dengyihao's avatar
dengyihao 已提交
733
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
734 735
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
736
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
737
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
738 739
}

L
Liu Jicong 已提交
740
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
741 742 743 744 745 746 747
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
748
      if (pCol->offset[pResultInfo->current] != -1) {
749 750 751 752 753 754
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
755
        pResultInfo->length[i] = 0;
756 757 758 759
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
760
        pResultInfo->length[i] = bytes;
761 762
      } else {
        pResultInfo->row[i] = NULL;
763
        pResultInfo->length[i] = 0;
764 765 766 767 768
      }
    }
  }
}

769
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
770
  assert(pRequest != NULL);
H
Haojun Liao 已提交
771

H
Haojun Liao 已提交
772
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
773
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
774
    // All data has returned to App already, no need to try again
775 776 777 778 779
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

H
Haojun Liao 已提交
780 781 782 783 784 785
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
786

H
Haojun Liao 已提交
787 788 789 790 791
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
792

H
Haojun Liao 已提交
793 794
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
H
Haojun Liao 已提交
795

H
Haojun Liao 已提交
796
    if (pResultInfo->numOfRows == 0) {
H
Haojun Liao 已提交
797 798 799 800
      return NULL;
    }
  }

801 802 803
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
804 805 806 807 808
  }

  return pResultInfo->row;
}

809
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
810
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
811 812
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
813 814
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
815

816 817 818
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
819
  }
820 821

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
822 823
}

824 825 826 827 828
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

829
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
830 831 832 833 834 835 836 837 838 839 840 841 842 843
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);
D
stmt  
dapan1121 已提交
844
          ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
dengyihao's avatar
dengyihao 已提交
845

846 847 848 849 850 851 852 853 854
          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
855 856 857 858 859 860 861 862

    if (type == TSDB_DATA_TYPE_JSON) {
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;
dengyihao's avatar
dengyihao 已提交
863
      int32_t        len = 0;
864 865 866 867 868 869
      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t jsonInnerType = *pStart;
dengyihao's avatar
dengyihao 已提交
870 871 872
          char*   jsonInnerData = pStart + CHAR_BYTES;
          char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
          if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
873 874
            sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
875 876 877
          } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
            int32_t length =
                taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(dst));
878 879

            if (length <= 0) {
dengyihao's avatar
dengyihao 已提交
880 881
              tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                       varDataVal(jsonInnerData));
882 883 884
              length = 0;
            }
            varDataSetLen(dst, length);
dengyihao's avatar
dengyihao 已提交
885
          } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
886
            *(char*)varDataVal(dst) = '\"';
dengyihao's avatar
dengyihao 已提交
887 888
            int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
                                           varDataVal(dst) + CHAR_BYTES);
889
            if (length <= 0) {
dengyihao's avatar
dengyihao 已提交
890 891
              tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                       varDataVal(jsonInnerData));
892 893
              length = 0;
            }
dengyihao's avatar
dengyihao 已提交
894
            varDataSetLen(dst, length + CHAR_BYTES * 2);
895
            *(char*)(varDataVal(dst), length + CHAR_BYTES) = '\"';
dengyihao's avatar
dengyihao 已提交
896
          } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
897 898 899
            double jsonVd = *(double*)(jsonInnerData);
            sprintf(varDataVal(dst), "%.9lf", jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
900
          } else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
901 902 903
            int64_t jsonVd = *(int64_t*)(jsonInnerData);
            sprintf(varDataVal(dst), "%" PRId64, jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
904 905
          } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
            sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
906
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
907
          } else {
908 909 910
            ASSERT(0);
          }

dengyihao's avatar
dengyihao 已提交
911
          if (len + varDataTLen(dst) > colLength[i]) {
912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928
            p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
            if (p == NULL) {
              return TSDB_CODE_OUT_OF_MEMORY;
            }

            pResultInfo->convertBuf[i] = p;
          }
          p = pResultInfo->convertBuf[i] + len;
          memcpy(p, dst, varDataTLen(dst));
          pCol->offset[j] = len;
          len += varDataTLen(dst);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
929 930 931 932 933
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
934 935
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
936 937
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
938
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
939 940
  }

941 942 943 944
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
945

L
Liu Jicong 已提交
946
  char* p = (char*)pResultInfo->pData;
947

L
Liu Jicong 已提交
948
  int32_t dataLen = *(int32_t*)p;
949 950
  p += sizeof(int32_t);

L
Liu Jicong 已提交
951
  uint64_t groupId = *(uint64_t*)p;
952 953 954 955 956 957
  p += sizeof(uint64_t);

  int32_t* colLength = (int32_t*)p;
  p += sizeof(int32_t) * numOfCols;

  char* pStart = p;
H
Haojun Liao 已提交
958
  for (int32_t i = 0; i < numOfCols; ++i) {
959
    colLength[i] = htonl(colLength[i]);
960
    ASSERT(colLength[i] < dataLen);
961 962 963 964 965 966 967 968 969 970

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
971
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
972 973 974
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
975
  }
976

H
Haojun Liao 已提交
977
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
978 979
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
980 981
  }

982
  return code;
S
Shengliang Guan 已提交
983 984
}

H
Haojun Liao 已提交
985
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
986
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
987
  taosThreadMutexLock(&pObj->mutex);
988 989 990 991
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
992

wafwerar's avatar
wafwerar 已提交
993
  taosThreadMutexUnlock(&pObj->mutex);
994 995 996 997 998
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
999
  taosThreadMutexLock(&pTscObj->mutex);
1000
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
1001
  taosThreadMutexUnlock(&pTscObj->mutex);
1002
}
S
Shengliang Guan 已提交
1003

H
Haojun Liao 已提交
1004 1005 1006 1007 1008 1009 1010 1011 1012 1013
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

1014
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
1015 1016
  assert(pResultInfo != NULL && pRsp != NULL);

L
Liu Jicong 已提交
1017 1018 1019 1020 1021
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
1022
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
1023
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
1024

1025
  // TODO handle the compressed case
1026
  pResultInfo->totalRows += pResultInfo->numOfRows;
1027 1028
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
1029
}
S
Shengliang Guan 已提交
1030 1031 1032 1033 1034 1035

TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
  void*              clientRpc = NULL;
  SServerStatusRsp   statusRsp = {0};
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
S
Shengliang Guan 已提交
1036
  SRpcMsg            rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
S
Shengliang Guan 已提交
1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
  SRpcMsg            rpcRsp = {0};
  SRpcInit           rpcInit = {0};
  char               pass[TSDB_PASSWORD_LEN + 1] = {0};

  rpcInit.label = "CHK";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "_dnd";

  clientRpc = rpcOpen(&rpcInit);
  if (clientRpc == NULL) {
    tscError("failed to init server status client");
    goto _OVER;
  }

S
Shengliang Guan 已提交
1055 1056 1057 1058 1059 1060 1061 1062
  if (fqdn == NULL) {
    fqdn = tsLocalFqdn;
  }

  if (port == 0) {
    port = tsServerPort;
  }

S
Shengliang Guan 已提交
1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
  epSet.eps[0].port = (uint16_t)port;
  rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
    tscError("failed to send server status req since %s", terrstr());
    goto _OVER;
  }

  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
    tscError("failed to parse server status rsp since %s", terrstr());
    goto _OVER;
  }

  code = statusRsp.statusCode;
S
Shengliang Guan 已提交
1078
  if (details != NULL && statusRsp.details != NULL) {
S
Shengliang Guan 已提交
1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090
    tstrncpy(details, statusRsp.details, maxlen);
  }

_OVER:
  if (clientRpc != NULL) {
    rpcClose(clientRpc);
  }
  if (rpcRsp.pCont != NULL) {
    rpcFreeCont(rpcRsp.pCont);
  }
  return code;
}