clientImpl.c 41.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "command.h"
20
#include "scheduler.h"
H
Haojun Liao 已提交
21
#include "tdatablock.h"
22
#include "tdataformat.h"
23
#include "tdef.h"
24
#include "tglobal.h"
25
#include "tmsgtype.h"
H
Haojun Liao 已提交
26
#include "tpagedbuf.h"
27
#include "tref.h"
X
Xiaoyu Wang 已提交
28

S
Shengliang Guan 已提交
29
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
30
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
dengyihao's avatar
dengyihao 已提交
31
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
32

33
static bool stringLengthCheck(const char* str, size_t maxsize) {
34 35 36 37 38 39 40 41 42 43 44 45
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
46
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
47

dengyihao's avatar
dengyihao 已提交
48
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
49

dengyihao's avatar
dengyihao 已提交
50
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
51

52 53 54 55 56 57
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
58
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
59
                                SAppInstInfo* pAppInfo, int connType);
60

dengyihao's avatar
dengyihao 已提交
61
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
L
Liu Jicong 已提交
62
                            uint16_t port, int connType) {
63 64 65 66
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

67 68 69 70 71
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

72
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
73
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
74
    if (!validateDbName(db)) {
75 76 77 78
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

79 80
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
81 82
  }

83
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
84 85 86 87 88 89
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
90
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
91 92 93 94
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

95
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
96 97 98 99 100 101 102 103 104
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
105

106 107
  if (port) {
    epSet.epSet.eps[0].port = port;
S
Shengliang Guan 已提交
108
    epSet.epSet.eps[1].port = port;
109 110
  }

111
  char* key = getClusterKey(user, secretEncrypt, ip, port);
112

113
  SAppInstInfo** pInst = NULL;
wafwerar's avatar
wafwerar 已提交
114
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
115 116

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
117
  SAppInstInfo* p = NULL;
118
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
119
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
120
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
121
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
122
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
123
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
124

H
Haojun Liao 已提交
125
    pInst = &p;
126 127
  }

wafwerar's avatar
wafwerar 已提交
128
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
129

wafwerar's avatar
wafwerar 已提交
130
  taosMemoryFreeClear(key);
L
Liu Jicong 已提交
131
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
132 133
}

dengyihao's avatar
dengyihao 已提交
134
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
135 136
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
137
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
138
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
139 140
  }

wafwerar's avatar
wafwerar 已提交
141
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
142
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
143
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
144 145
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
146 147
  }

X
Xiaoyu Wang 已提交
148 149 150
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
151

dengyihao's avatar
dengyihao 已提交
152 153
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
                  sizeof((*pRequest)->self))) {
D
dapan1121 已提交
154 155 156 157 158 159
    destroyRequest(*pRequest);
    *pRequest = NULL;
    tscError("put request to request hash failed");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
160
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
161 162
  return TSDB_CODE_SUCCESS;
}
163

D
stmt  
dapan1121 已提交
164
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
165 166
  STscObj* pTscObj = pRequest->pTscObj;

167 168 169 170 171 172 173 174 175 176
  SParseContext cxt = {.requestId = pRequest->requestId,
                       .acctId = pTscObj->acctId,
                       .db = pRequest->pDb,
                       .topicQuery = topicQuery,
                       .pSql = pRequest->sqlstr,
                       .sqlLen = pRequest->sqlLen,
                       .pMsg = pRequest->msgBuf,
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
                       .pStmtCb = pStmtCb,
177 178
                       .pUser = pTscObj->user,
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
H
Haojun Liao 已提交
179

H
Haojun Liao 已提交
180 181
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
182 183 184 185
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
186
  code = qParseSql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
187 188 189
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
190
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
X
Xiaoyu Wang 已提交
191
    }
192 193
  }
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
wafwerar's avatar
wafwerar 已提交
194 195
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
X
Xiaoyu Wang 已提交
196
  }
197

X
Xiaoyu Wang 已提交
198 199
  return code;
}
H
Haojun Liao 已提交
200

201 202
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
L
Liu Jicong 已提交
203
  int32_t            code = qExecCommand(pQuery->pRoot, &pRsp);
204
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
205
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
206 207 208 209
  }
  return code;
}

X
Xiaoyu Wang 已提交
210
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
211 212 213 214 215
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

216 217 218
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
219
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
220 221 222

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
223

224 225
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
226

227
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
228 229
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
230

231 232
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
L
Liu Jicong 已提交
233 234 235 236 237 238
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
                      .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite,
                      .pMsg = pRequest->msgBuf,
239
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
240 241 242 243 244 245
  SEpSet       mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
  SCatalog*    pCatalog = NULL;
  int32_t      code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
  if (TSDB_CODE_SUCCESS == code) {
    code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, pNodeList);
  }
X
Xiaoyu Wang 已提交
246 247
  if (TSDB_CODE_SUCCESS == code) {
    code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
248 249
  }
  return code;
X
Xiaoyu Wang 已提交
250 251
}

252
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
253
  ASSERT(pSchema != NULL && numOfCols > 0);
254

255
  pResInfo->numOfCols = numOfCols;
L
Liu Jicong 已提交
256 257 258 259 260 261
  if (pResInfo->fields != NULL) {
    taosMemoryFree(pResInfo->fields);
  }
  if (pResInfo->userFields != NULL) {
    taosMemoryFree(pResInfo->userFields);
  }
262 263
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
264 265

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
266
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
267
    pResInfo->fields[i].type = pSchema[i].type;
268 269

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
270
    pResInfo->userFields[i].type = pSchema[i].type;
271 272 273

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
274
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
275 276 277
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

278
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
279
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
280
  }
X
Xiaoyu Wang 已提交
281 282
}

283
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
dengyihao's avatar
dengyihao 已提交
284 285
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
      precision != TSDB_TIME_PRECISION_NANO) {
286 287 288 289 290 291
    return;
  }

  pResInfo->precision = precision;
}

D
dapan1121 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
  
  tsem_init(&schdRspSem, 0, 0);

  SQueryResult res = {.code = 0, .numOfRows = 0};
  int32_t      code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
                                       pRequest->metric.start, schdExecCallback, &res);
  while (true) {                                       
    if (code != TSDB_CODE_SUCCESS) {
      if (pRequest->body.queryJob != 0) {
        schedulerFreeJob(pRequest->body.queryJob);
      }

      *pRes = res.res;

      pRequest->code = code;
      terrno = code;
      return pRequest->code;
    } else {
      tsem_wait(&schdRspSem);
      
      if (res.code) {
        code = res.code;
      } else {
        break;
      }
    }
  }

  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
    pRequest->body.resInfo.numOfRows = res.numOfRows;

    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
    }
  }

  *pRes = res.res;

  pRequest->code = res.code;
  terrno = res.code;
  return pRequest->code;
}


D
dapan 已提交
338
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) {
339
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
340

D
dapan1121 已提交
341
  SQueryResult res = {.code = 0, .numOfRows = 0};
L
Liu Jicong 已提交
342
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
343
                                       pRequest->metric.start, &res);
D
dapan1121 已提交
344
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
345 346
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
347 348
    }

D
dapan1121 已提交
349 350
    *pRes = res.res;

D
dapan1121 已提交
351
    pRequest->code = code;
D
dapan1121 已提交
352
    terrno = code;
H
Haojun Liao 已提交
353
    return pRequest->code;
X
Xiaoyu Wang 已提交
354
  }
355

356
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
357
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
358

D
dapan1121 已提交
359 360
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
361 362
    }
  }
D
dapan1121 已提交
363

D
dapan1121 已提交
364
  *pRes = res.res;
D
dapan 已提交
365

D
dapan1121 已提交
366
  pRequest->code = res.code;
L
Liu Jicong 已提交
367
  terrno = res.code;
D
dapan1121 已提交
368
  return pRequest->code;
X
Xiaoyu Wang 已提交
369
}
X
Xiaoyu Wang 已提交
370

D
dapan1121 已提交
371 372 373 374 375
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
  *pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
  return getPlan(pRequest, pQuery, &pRequest->body.pDag, *pNodeList);
}

D
dapan1121 已提交
376 377 378
int32_t validateSversion(SRequestObj* pRequest, void* res) {
  SArray* pArray = NULL;
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
379

D
dapan1121 已提交
380 381 382 383 384 385 386 387 388 389 390
  if (TDMT_VND_SUBMIT == pRequest->type) {
    SSubmitRsp* pRsp = (SSubmitRsp*)res;
    if (pRsp->nBlocks <= 0) {
      return TSDB_CODE_SUCCESS;
    }

    pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
    if (NULL == pArray) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_OUT_OF_MEMORY;
    }
dengyihao's avatar
dengyihao 已提交
391

D
dapan1121 已提交
392
    for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
dengyihao's avatar
dengyihao 已提交
393
      SSubmitBlkRsp* blk = pRsp->pBlocks + i;
D
dapan1121 已提交
394 395 396
      if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
        continue;
      }
397 398

      STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
D
dapan1121 已提交
399 400 401
      taosArrayPush(pArray, &tbSver);
    }
  } else if (TDMT_VND_QUERY == pRequest->type) {
402 403 404 405 406 407 408 409 410 411 412
    SArray* pTbArray = (SArray*)res;
    int32_t tbNum = taosArrayGetSize(pTbArray);
    if (tbNum <= 0) {
      return TSDB_CODE_SUCCESS;
    }

    pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
    if (NULL == pArray) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      return TSDB_CODE_OUT_OF_MEMORY;
    }
D
dapan1121 已提交
413

414 415
    for (int32_t i = 0; i < tbNum; ++i) {
      STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
D
dapan1121 已提交
416
      STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
417 418
      taosArrayPush(pArray, &tbSver);
    }
D
dapan1121 已提交
419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434
  }

  SCatalog* pCatalog = NULL;
  CHECK_CODE_GOTO(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog), _return);

  SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);

  code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &epset, pArray);

_return:

  taosArrayDestroy(pArray);

  return code;
}

D
dapan1121 已提交
435
void freeRequestRes(SRequestObj* pRequest, void* res) {
D
dapan1121 已提交
436
  if (NULL == pRequest || NULL == res) {
D
dapan1121 已提交
437 438
    return;
  }
dengyihao's avatar
dengyihao 已提交
439

D
dapan1121 已提交
440 441 442
  if (TDMT_VND_SUBMIT == pRequest->type) {
    tFreeSSubmitRsp((SSubmitRsp*)res);
  } else if (TDMT_VND_QUERY == pRequest->type) {
L
Liu Jicong 已提交
443
    taosArrayDestroy((SArray*)res);
D
dapan1121 已提交
444 445
  }
}
D
dapan1121 已提交
446

D
dapan 已提交
447
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
D
dapan1121 已提交
448 449
  void* pRes = NULL;

X
Xiaoyu Wang 已提交
450 451 452 453 454 455 456 457
  if (TSDB_CODE_SUCCESS == code) {
    switch (pQuery->execMode) {
      case QUERY_EXEC_MODE_LOCAL:
        code = execLocalCmd(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_RPC:
        code = execDdlQuery(pRequest, pQuery);
        break;
X
Xiaoyu Wang 已提交
458 459
      case QUERY_EXEC_MODE_SCHEDULE: {
        SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
460 461
        code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
        if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
462 463 464 465
          code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList, &pRes);
          if (NULL != pRes) {
            code = validateSversion(pRequest, pRes);
          }
X
Xiaoyu Wang 已提交
466
        }
X
Xiaoyu Wang 已提交
467
        taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
468
        break;
X
Xiaoyu Wang 已提交
469
      }
X
Xiaoyu Wang 已提交
470 471 472 473 474 475
      case QUERY_EXEC_MODE_EMPTY_RESULT:
        pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
476 477
  }

D
stmt  
dapan1121 已提交
478 479 480
  if (!keepQuery) {
    qDestroyQuery(pQuery);
  }
dengyihao's avatar
dengyihao 已提交
481

D
dapan1121 已提交
482
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
483
    pRequest->code = terrno;
D
dapan1121 已提交
484 485 486 487
  }

  if (res) {
    *res = pRes;
D
dapan1121 已提交
488 489 490
  } else {
    freeRequestRes(pRequest, pRes);
    pRes = NULL;
X
Xiaoyu Wang 已提交
491
  }
492

493 494 495
  return pRequest;
}

D
stmt  
dapan1121 已提交
496 497 498 499 500
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  SQuery*      pQuery = NULL;

  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
501 502 503 504 505 506 507
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  code = parseSql(pRequest, false, &pQuery, NULL);
  if (code != TSDB_CODE_SUCCESS) {
508 509
    pRequest->code = code;
    return pRequest;
D
stmt  
dapan1121 已提交
510 511
  }

D
dapan 已提交
512
  return launchQueryImpl(pRequest, pQuery, code, false, NULL);
D
stmt  
dapan1121 已提交
513 514
}

D
dapan1121 已提交
515
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
516 517 518 519
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
520 521 522 523

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
524

D
dapan1121 已提交
525 526 527 528 529 530 531 532
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
533 534
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
535 536 537
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
538 539 540
    }
  }

D
dapan1121 已提交
541
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
542
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
543 544 545 546 547

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
548 549
  }

D
dapan1121 已提交
550
  return code;
D
dapan1121 已提交
551 552
}

D
dapan1121 已提交
553 554
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
  SCatalog* pCatalog = NULL;
555 556
  int32_t   tbNum = taosArrayGetSize(tbList);
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
557 558 559
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
560

D
dapan1121 已提交
561 562 563 564 565 566 567 568
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pTbName = taosArrayGet(tbList, i);
    catalogRemoveTableMeta(pCatalog, pTbName);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
569 570
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
571 572
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
573

574 575
  do {
    destroyRequest(pRequest);
D
stmt  
dapan1121 已提交
576
    pRequest = launchQuery(pTscObj, sql, sqlLen);
577
    if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
578 579 580
      break;
    }

D
dapan1121 已提交
581 582 583
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
584 585
      break;
    }
586
  } while (retryNum++ < REQUEST_MAX_TRY_TIMES);
L
Liu Jicong 已提交
587

D
dapan1121 已提交
588 589 590
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
    removeMeta(pTscObj, pRequest->tableList);
  }
591

D
dapan1121 已提交
592 593 594 595 596 597 598 599 600 601 602 603 604 605
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
606 607
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
608

H
Haojun Liao 已提交
609
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
610
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
611
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
612
  mgmtEpSet->inUse = 0;
613

S
Shengliang Guan 已提交
614 615 616 617
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
618
    }
S
Shengliang Guan 已提交
619

620 621 622 623 624 625
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

S
Shengliang Guan 已提交
626 627 628 629 630 631 632
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
633
    }
S
Shengliang Guan 已提交
634 635 636

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
637 638 639 640 641 642 643 644 645 646
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
647
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
648
                         SAppInstInfo* pAppInfo, int connType) {
649
  STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
650 651 652 653 654
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
655
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
656 657 658
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
659 660 661
    return NULL;
  }

662
  SMsgSendInfo* body = buildConnectMsg(pRequest);
663 664

  int64_t transporterId = 0;
H
Haojun Liao 已提交
665
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
666 667 668

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
669 670
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
671
    fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
672

673
    terrno = pRequest->code;
674 675 676 677
    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
678
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
679
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
680 681 682 683 684 685
    destroyRequest(pRequest);
  }

  return pTscObj;
}

686
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
687
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
688 689 690 691 692
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
693
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
694

695
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
696 697 698
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
699

S
Shengliang Guan 已提交
700 701
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
702

H
Haojun Liao 已提交
703
  char* db = getDbOfConnection(pObj);
704
  if (db != NULL) {
S
Shengliang Guan 已提交
705
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
706
  }
wafwerar's avatar
wafwerar 已提交
707
  taosMemoryFreeClear(db);
708

X
Xiaoyu Wang 已提交
709 710
  connectReq.connType = pObj->connType;
  connectReq.pid = htonl(appInfo.pid);
S
Shengliang Guan 已提交
711
  connectReq.startTime = htobe64(appInfo.startTime);
712

S
Shengliang Guan 已提交
713
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
dengyihao's avatar
dengyihao 已提交
714 715
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
S
Shengliang Guan 已提交
716 717

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
718
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
719
  tSerializeSConnectReq(pReq, contLen, &connectReq);
720

S
Shengliang Guan 已提交
721 722
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
723
  return pMsgSendInfo;
724 725
}

726
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
727
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
728 729
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
730
}
731

D
dapan1121 已提交
732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
  if (NULL == pEpSet) {
    return;
  }
  
  switch (pSendInfo->target.type) {
    case TARGET_TYPE_MNODE:
      if (NULL == pTscObj) {
        tscError("mnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
        return;
      }

      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);   
      break;
    case TARGET_TYPE_VNODE: {
      if (NULL == pTscObj) {
        tscError("vnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
        return;
      }

      SCatalog* pCatalog = NULL;
      int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
      if (code != TSDB_CODE_SUCCESS) {
        tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId, tstrerror(code));
        return;
      }
    
      catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
      break;
    }
    default:
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
      break;
  }
}


769
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
770 771
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
D
dapan1121 已提交
772 773
  SRequestObj* pRequest = NULL;
  STscObj* pTscObj = NULL;
774

775
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
776
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
777
    assert(pRequest->self == pSendInfo->requestObjRefId);
778

D
dapan1121 已提交
779
    pRequest->metric.rsp = taosGetTimestampUs();
D
dapan1121 已提交
780
    pTscObj = pRequest->pTscObj;
781
    /*
dengyihao's avatar
dengyihao 已提交
782 783
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
784
     */
785
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
786
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
787
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
788
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
789
    } else {
dengyihao's avatar
dengyihao 已提交
790
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
791
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
792
    }
793

H
Haojun Liao 已提交
794
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
795 796
  }

D
dapan1121 已提交
797 798
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);

S
Shengliang Guan 已提交
799
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
800 801

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
802
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
803 804 805 806 807 808
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
809 810
  }

811
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
812
  rpcFreeCont(pMsg->pCont);
813
  destroySendMsgInfo(pSendInfo);
814
}
815

dengyihao's avatar
dengyihao 已提交
816
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
817 818 819 820 821 822 823 824 825 826
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

L
Liu Jicong 已提交
827
  return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
828 829
}

dengyihao's avatar
dengyihao 已提交
830 831 832
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
833
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
834 835
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
836

dengyihao's avatar
dengyihao 已提交
837
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
838 839
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
840
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
841
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
842 843
}

L
Liu Jicong 已提交
844
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
845 846 847 848 849 850 851
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
852
      if (pCol->offset[pResultInfo->current] != -1) {
853 854 855 856 857 858
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
859
        pResultInfo->length[i] = 0;
860 861 862 863
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
864
        pResultInfo->length[i] = bytes;
865 866
      } else {
        pResultInfo->row[i] = NULL;
867
        pResultInfo->length[i] = 0;
868 869 870 871 872
      }
    }
  }
}

D
dapan1121 已提交
873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
  assert(pRequest != NULL);

  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
    // All data has returned to App already, no need to try again
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    tsem_init(&schdRspSem, 0, 0);

    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
    SSchdFetchParam param = {.pData = (void**)&pResInfo->pData, .code = &pRequest->code};
    pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, schdFetchCallback, &param);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    tsem_wait(&schdRspSem);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

    if (pResultInfo->numOfRows == 0) {
      return NULL;
    }
  }

  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
  }

  return pResultInfo->row;
}


923
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
D
dapan1121 已提交
924
  //return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
925
  assert(pRequest != NULL);
H
Haojun Liao 已提交
926

H
Haojun Liao 已提交
927
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
928
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
929
    // All data has returned to App already, no need to try again
930 931 932 933 934
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

H
Haojun Liao 已提交
935 936 937 938 939 940
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
941

H
Haojun Liao 已提交
942 943 944 945 946
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
947

H
Haojun Liao 已提交
948 949
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
H
Haojun Liao 已提交
950

H
Haojun Liao 已提交
951
    if (pResultInfo->numOfRows == 0) {
H
Haojun Liao 已提交
952 953 954 955
      return NULL;
    }
  }

956 957 958
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
959 960 961 962 963
  }

  return pResultInfo->row;
}

964
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
965
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
966 967
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
968 969
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
970

971 972 973
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
974
  }
975 976

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
977 978
}

979 980 981 982
static char* parseTagDatatoJson(void* p) {
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
wmmhello's avatar
wmmhello 已提交
983 984 985 986
    goto end;
  }

  int16_t nCols = kvRowNCols(p);
987
  char    tagJsonKey[256] = {0};
wmmhello's avatar
wmmhello 已提交
988
  for (int j = 0; j < nCols; ++j) {
989 990 991 992
    SColIdx* pColIdx = kvRowColIdxAt(p, j);
    char*    val = (char*)(kvRowColVal(p, pColIdx));
    if (j == 0) {
      if (*val == TSDB_DATA_TYPE_NULL) {
wmmhello's avatar
wmmhello 已提交
993
        string = taosMemoryCalloc(1, 8);
wmmhello's avatar
wmmhello 已提交
994
        sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
wmmhello's avatar
wmmhello 已提交
995 996 997 998 999 1000 1001 1002 1003 1004 1005
        goto end;
      }
      continue;
    }

    // json key  encode by binary
    memset(tagJsonKey, 0, sizeof(tagJsonKey));
    memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
    // json value
    val += varDataTLen(val);
    char* realData = POINTER_SHIFT(val, CHAR_BYTES);
1006 1007
    char  type = *val;
    if (type == TSDB_DATA_TYPE_NULL) {
wmmhello's avatar
wmmhello 已提交
1008
      cJSON* value = cJSON_CreateNull();
1009
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1010 1011 1012
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1013
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1014
      cJSON* value = NULL;
1015 1016 1017
      if (varDataLen(realData) > 0) {
        char*   tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
        int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue);
wmmhello's avatar
wmmhello 已提交
1018
        if (length < 0) {
wmmhello's avatar
wmmhello 已提交
1019
          tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
wmmhello's avatar
wmmhello 已提交
1020 1021 1022 1023 1024
          taosMemoryFree(tagJsonValue);
          goto end;
        }
        value = cJSON_CreateString(tagJsonValue);
        taosMemoryFree(tagJsonValue);
1025
        if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1026 1027
          goto end;
        }
1028
      } else if (varDataLen(realData) == 0) {
wmmhello's avatar
wmmhello 已提交
1029
        value = cJSON_CreateString("");
1030
      } else {
wmmhello's avatar
wmmhello 已提交
1031 1032 1033 1034
        ASSERT(0);
      }

      cJSON_AddItemToObject(json, tagJsonKey, value);
1035
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
wmmhello's avatar
wmmhello 已提交
1036 1037
      double jsonVd = *(double*)(realData);
      cJSON* value = cJSON_CreateNumber(jsonVd);
1038
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1039 1040 1041
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1042 1043 1044 1045 1046 1047 1048 1049 1050 1051
      //    }else if(type == TSDB_DATA_TYPE_BIGINT){
      //      int64_t jsonVd = *(int64_t*)(realData);
      //      cJSON* value = cJSON_CreateNumber((double)jsonVd);
      //      if (value == NULL)
      //      {
      //        goto end;
      //      }
      //      cJSON_AddItemToObject(json, tagJsonKey, value);
    } else if (type == TSDB_DATA_TYPE_BOOL) {
      char   jsonVd = *(char*)(realData);
wmmhello's avatar
wmmhello 已提交
1052
      cJSON* value = cJSON_CreateBool(jsonVd);
1053
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1054 1055 1056
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1057
    } else {
wmmhello's avatar
wmmhello 已提交
1058
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
1059 1060 1061 1062 1063 1064 1065 1066
    }
  }
  string = cJSON_PrintUnformatted(json);
end:
  cJSON_Delete(json);
  return string;
}

1067 1068 1069 1070 1071
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

1072
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);
D
stmt  
dapan1121 已提交
1087
          ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
dengyihao's avatar
dengyihao 已提交
1088

1089 1090 1091 1092 1093 1094 1095 1096
          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1097
    } else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
1098 1099 1100 1101 1102 1103
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;
dengyihao's avatar
dengyihao 已提交
1104
      int32_t        len = 0;
1105 1106 1107 1108 1109 1110
      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t jsonInnerType = *pStart;
dengyihao's avatar
dengyihao 已提交
1111 1112 1113
          char*   jsonInnerData = pStart + CHAR_BYTES;
          char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
          if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1114 1115
            sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1116
          } else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
1117
            char* jsonString = parseTagDatatoJson(jsonInnerData);
wmmhello's avatar
wmmhello 已提交
1118 1119
            STR_TO_VARSTR(dst, jsonString);
            taosMemoryFree(jsonString);
dengyihao's avatar
dengyihao 已提交
1120
          } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1121
            *(char*)varDataVal(dst) = '\"';
dengyihao's avatar
dengyihao 已提交
1122 1123
            int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
                                           varDataVal(dst) + CHAR_BYTES);
1124
            if (length <= 0) {
dengyihao's avatar
dengyihao 已提交
1125 1126
              tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                       varDataVal(jsonInnerData));
1127 1128
              length = 0;
            }
dengyihao's avatar
dengyihao 已提交
1129
            varDataSetLen(dst, length + CHAR_BYTES * 2);
wmmhello's avatar
wmmhello 已提交
1130
            *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
dengyihao's avatar
dengyihao 已提交
1131
          } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
1132 1133 1134
            double jsonVd = *(double*)(jsonInnerData);
            sprintf(varDataVal(dst), "%.9lf", jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1135
          } else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
1136 1137 1138
            int64_t jsonVd = *(int64_t*)(jsonInnerData);
            sprintf(varDataVal(dst), "%" PRId64, jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1139 1140
          } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
            sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
1141
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1142
          } else {
1143 1144 1145
            ASSERT(0);
          }

dengyihao's avatar
dengyihao 已提交
1146
          if (len + varDataTLen(dst) > colLength[i]) {
1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163
            p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
            if (p == NULL) {
              return TSDB_CODE_OUT_OF_MEMORY;
            }

            pResultInfo->convertBuf[i] = p;
          }
          p = pResultInfo->convertBuf[i] + len;
          memcpy(p, dst, varDataTLen(dst));
          pCol->offset[j] = len;
          len += varDataTLen(dst);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
1164 1165 1166 1167 1168
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1169 1170
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
1171 1172
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
1173
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1174 1175
  }

1176 1177 1178 1179
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1180

L
Liu Jicong 已提交
1181
  char* p = (char*)pResultInfo->pData;
1182

L
Liu Jicong 已提交
1183
  int32_t dataLen = *(int32_t*)p;
1184 1185
  p += sizeof(int32_t);

L
Liu Jicong 已提交
1186
  uint64_t groupId = *(uint64_t*)p;
1187 1188 1189 1190 1191 1192
  p += sizeof(uint64_t);

  int32_t* colLength = (int32_t*)p;
  p += sizeof(int32_t) * numOfCols;

  char* pStart = p;
H
Haojun Liao 已提交
1193
  for (int32_t i = 0; i < numOfCols; ++i) {
1194
    colLength[i] = htonl(colLength[i]);
1195
    ASSERT(colLength[i] < dataLen);
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
1206
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
1207 1208 1209
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
1210
  }
1211

H
Haojun Liao 已提交
1212
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
1213 1214
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
1215 1216
  }

1217
  return code;
S
Shengliang Guan 已提交
1218 1219
}

H
Haojun Liao 已提交
1220
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
1221
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
1222
  taosThreadMutexLock(&pObj->mutex);
1223 1224 1225 1226
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
1227

wafwerar's avatar
wafwerar 已提交
1228
  taosThreadMutexUnlock(&pObj->mutex);
1229 1230 1231 1232 1233
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
1234
  taosThreadMutexLock(&pTscObj->mutex);
1235
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
1236
  taosThreadMutexUnlock(&pTscObj->mutex);
1237
}
S
Shengliang Guan 已提交
1238

H
Haojun Liao 已提交
1239 1240 1241 1242 1243 1244 1245 1246 1247 1248
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

1249
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
1250 1251
  assert(pResultInfo != NULL && pRsp != NULL);

D
dapan1121 已提交
1252 1253
  taosMemoryFreeClear(pResultInfo->pRspMsg);
  
L
Liu Jicong 已提交
1254 1255 1256 1257 1258
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
1259
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
1260
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
1261

1262
  // TODO handle the compressed case
1263
  pResultInfo->totalRows += pResultInfo->numOfRows;
1264 1265
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
1266
}
S
Shengliang Guan 已提交
1267 1268 1269 1270 1271 1272

TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
  void*              clientRpc = NULL;
  SServerStatusRsp   statusRsp = {0};
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
S
Shengliang Guan 已提交
1273
  SRpcMsg            rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
S
Shengliang Guan 已提交
1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291
  SRpcMsg            rpcRsp = {0};
  SRpcInit           rpcInit = {0};
  char               pass[TSDB_PASSWORD_LEN + 1] = {0};

  rpcInit.label = "CHK";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "_dnd";

  clientRpc = rpcOpen(&rpcInit);
  if (clientRpc == NULL) {
    tscError("failed to init server status client");
    goto _OVER;
  }

S
Shengliang Guan 已提交
1292 1293 1294 1295 1296 1297 1298 1299
  if (fqdn == NULL) {
    fqdn = tsLocalFqdn;
  }

  if (port == 0) {
    port = tsServerPort;
  }

S
Shengliang Guan 已提交
1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
  epSet.eps[0].port = (uint16_t)port;
  rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
    tscError("failed to send server status req since %s", terrstr());
    goto _OVER;
  }

  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
    tscError("failed to parse server status rsp since %s", terrstr());
    goto _OVER;
  }

  code = statusRsp.statusCode;
S
Shengliang Guan 已提交
1315
  if (details != NULL && statusRsp.details != NULL) {
S
Shengliang Guan 已提交
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327
    tstrncpy(details, statusRsp.details, maxlen);
  }

_OVER:
  if (clientRpc != NULL) {
    rpcClose(clientRpc);
  }
  if (rpcRsp.pCont != NULL) {
    rpcFreeCont(rpcRsp.pCont);
  }
  return code;
}