clientImpl.c 50.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "command.h"
20
#include "scheduler.h"
H
Haojun Liao 已提交
21
#include "tdatablock.h"
22
#include "tdataformat.h"
23
#include "tdef.h"
24
#include "tglobal.h"
25
#include "tmsgtype.h"
H
Haojun Liao 已提交
26
#include "tpagedbuf.h"
27
#include "tref.h"
X
Xiaoyu Wang 已提交
28

S
Shengliang Guan 已提交
29
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
30
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
dengyihao's avatar
dengyihao 已提交
31
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
32

33
static bool stringLengthCheck(const char* str, size_t maxsize) {
34 35 36 37 38 39 40 41 42 43 44 45
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
46
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
47

dengyihao's avatar
dengyihao 已提交
48
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
49

dengyihao's avatar
dengyihao 已提交
50
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
51

52 53 54 55 56 57
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
58
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
59
                                SAppInstInfo* pAppInfo, int connType);
60

61
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
L
Liu Jicong 已提交
62
                            uint16_t port, int connType) {
63 64 65 66
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

67 68 69 70 71
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

72
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
73
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
74
    if (!validateDbName(db)) {
75 76 77 78
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

79 80
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
81 82
  }

83
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
84 85 86 87 88 89
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
90
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
91 92 93 94
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

95
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
96 97 98 99 100 101 102 103 104
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
105

106 107
  if (port) {
    epSet.epSet.eps[0].port = port;
S
Shengliang Guan 已提交
108
    epSet.epSet.eps[1].port = port;
109 110
  }

111
  char* key = getClusterKey(user, secretEncrypt, ip, port);
112

113
  SAppInstInfo** pInst = NULL;
wafwerar's avatar
wafwerar 已提交
114
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
115 116

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
117
  SAppInstInfo* p = NULL;
118
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
119
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
L
Liu Jicong 已提交
120
    p->mgmtEp = epSet;
D
dapan1121 已提交
121
    taosThreadMutexInit(&p->qnodeMutex, NULL);
H
Haojun Liao 已提交
122
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
123
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
124
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
125

H
Haojun Liao 已提交
126
    pInst = &p;
127 128
  }

wafwerar's avatar
wafwerar 已提交
129
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
130

wafwerar's avatar
wafwerar 已提交
131
  taosMemoryFreeClear(key);
L
Liu Jicong 已提交
132
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
133 134
}

dengyihao's avatar
dengyihao 已提交
135
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
136
  *pRequest = createRequest(pTscObj, TSDB_SQL_SELECT);
X
Xiaoyu Wang 已提交
137
  if (*pRequest == NULL) {
138
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
139
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
140 141
  }

wafwerar's avatar
wafwerar 已提交
142
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
143
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
144
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
145 146
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
147 148
  }

X
Xiaoyu Wang 已提交
149 150 151
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
152

dengyihao's avatar
dengyihao 已提交
153 154
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
                  sizeof((*pRequest)->self))) {
D
dapan1121 已提交
155 156 157 158 159 160
    destroyRequest(*pRequest);
    *pRequest = NULL;
    tscError("put request to request hash failed");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
161
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
162 163
  return TSDB_CODE_SUCCESS;
}
164

D
stmt  
dapan1121 已提交
165
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
166 167
  STscObj* pTscObj = pRequest->pTscObj;

168
  SParseContext cxt = {.requestId = pRequest->requestId,
D
dapan1121 已提交
169
                       .requestRid = pRequest->self,
170 171 172 173 174 175 176 177 178
                       .acctId = pTscObj->acctId,
                       .db = pRequest->pDb,
                       .topicQuery = topicQuery,
                       .pSql = pRequest->sqlstr,
                       .sqlLen = pRequest->sqlLen,
                       .pMsg = pRequest->msgBuf,
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
                       .pStmtCb = pStmtCb,
179
                       .pUser = pTscObj->user,
wmmhello's avatar
wmmhello 已提交
180
                       .schemalessType = pTscObj->schemalessType,
181
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
H
Haojun Liao 已提交
182

H
Haojun Liao 已提交
183 184
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
185 186 187 188
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
189
  code = qParseSql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
190 191 192
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
193
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
X
Xiaoyu Wang 已提交
194
    }
195
  }
196

197
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
wafwerar's avatar
wafwerar 已提交
198 199
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
X
Xiaoyu Wang 已提交
200
  }
201

X
Xiaoyu Wang 已提交
202 203
  return code;
}
H
Haojun Liao 已提交
204

205 206
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
L
Liu Jicong 已提交
207
  int32_t            code = qExecCommand(pQuery->pRoot, &pRsp);
208
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
209
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
210
  }
211

212 213 214
  return code;
}

X
Xiaoyu Wang 已提交
215
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
216 217 218 219 220
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

221 222 223
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
224
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
225 226 227

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
228

229 230
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
231

232
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
233 234
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
235

236
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
237

238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;

  int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
  }

  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;

  if (pRequest->code != TSDB_CODE_SUCCESS) {
    pResultInfo->numOfRows = 0;
    pRequest->code = code;
    tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
             pRequest->requestId);
  } else {
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
             pRequest->requestId);
  }

259
  pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
260
  //  pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows);
261 262
}

263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management

  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
282
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
L
Liu Jicong 已提交
283 284
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
D
dapan1121 已提交
285 286 287 288

  if (node1->load < node2->load) {
    return -1;
  }
L
Liu Jicong 已提交
289

D
dapan1121 已提交
290 291 292
  return node1->load > node2->load;
}

L
Liu Jicong 已提交
293
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
D
dapan1121 已提交
294 295 296 297 298
  taosThreadMutexLock(&pInfo->qnodeMutex);
  if (pInfo->pQnodeList) {
    taosArrayDestroy(pInfo->pQnodeList);
    pInfo->pQnodeList = NULL;
  }
L
Liu Jicong 已提交
299

D
dapan1121 已提交
300 301 302 303 304 305 306 307 308 309
  if (pNodeList) {
    pInfo->pQnodeList = taosArrayDup(pNodeList);
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
  }
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  return TSDB_CODE_SUCCESS;
}

int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
L
Liu Jicong 已提交
310 311 312
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
  int32_t       code = 0;

D
dapan1121 已提交
313 314 315 316 317 318 319
  taosThreadMutexLock(&pInfo->qnodeMutex);
  if (pInfo->pQnodeList) {
    *pNodeList = taosArrayDup(pInfo->pQnodeList);
  }
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  if (NULL == *pNodeList) {
L
Liu Jicong 已提交
320
    SCatalog* pCatalog = NULL;
D
dapan1121 已提交
321 322 323
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (TSDB_CODE_SUCCESS == code) {
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
X
Xiaoyu Wang 已提交
324
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
325 326 327 328
                               .requestId = pRequest->requestId,
                               .requestObjRefId = pRequest->self,
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
D
dapan1121 已提交
329
    }
L
Liu Jicong 已提交
330

D
dapan1121 已提交
331 332 333 334 335 336 337 338 339
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
      code = updateQnodeList(pInfo, *pNodeList);
    }
  }

  return code;
}

int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) {
340
  pRequest->type = pQuery->msgType;
341 342
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);

L
Liu Jicong 已提交
343 344
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
345
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
L
Liu Jicong 已提交
346 347 348
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite,
                      .pMsg = pRequest->msgBuf,
349
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
D
dapan1121 已提交
350 351

  int32_t code = getQnodeList(pRequest, pNodeList);
X
Xiaoyu Wang 已提交
352
  if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
353
    code = qCreateQueryPlan(&cxt, pPlan, *pNodeList);
X
Xiaoyu Wang 已提交
354
  }
355

X
Xiaoyu Wang 已提交
356
  return code;
X
Xiaoyu Wang 已提交
357 358
}

359
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
360
  ASSERT(pSchema != NULL && numOfCols > 0);
361

362
  pResInfo->numOfCols = numOfCols;
L
Liu Jicong 已提交
363 364 365 366 367 368
  if (pResInfo->fields != NULL) {
    taosMemoryFree(pResInfo->fields);
  }
  if (pResInfo->userFields != NULL) {
    taosMemoryFree(pResInfo->userFields);
  }
369 370
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
371 372

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
373
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
374
    pResInfo->fields[i].type = pSchema[i].type;
375 376

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
377
    pResInfo->userFields[i].type = pSchema[i].type;
378 379 380

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
381
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
382 383 384
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

385
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
386
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
387
  }
X
Xiaoyu Wang 已提交
388 389
}

390
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
dengyihao's avatar
dengyihao 已提交
391 392
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
      precision != TSDB_TIME_PRECISION_NANO) {
393 394 395 396 397 398
    return;
  }

  pResInfo->precision = precision;
}

D
dapan1121 已提交
399
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
D
dapan1121 已提交
400 401
  tsem_init(&schdRspSem, 0, 0);

X
Xiaoyu Wang 已提交
402 403
  SQueryResult     res = {.code = 0, .numOfRows = 0};
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
404
                           .requestId = pRequest->requestId,
X
Xiaoyu Wang 已提交
405 406 407 408 409 410 411 412 413 414
                           .requestObjRefId = pRequest->self};
  SSchedulerReq    req = {.pConn = &conn,
                          .pNodeList = pNodeList,
                          .pDag = pDag,
                          .sql = pRequest->sqlstr,
                          .startTs = pRequest->metric.start,
                          .fp = schdExecCallback,
                          .cbParam = &res};

  int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
D
dapan1121 已提交
415 416 417

  pRequest->body.resInfo.execRes = res.res;

418
  while (true) {
D
dapan1121 已提交
419 420
    if (code != TSDB_CODE_SUCCESS) {
      if (pRequest->body.queryJob != 0) {
D
dapan1121 已提交
421
        schedulerFreeJob(pRequest->body.queryJob, 0);
D
dapan1121 已提交
422 423 424 425 426 427 428
      }

      pRequest->code = code;
      terrno = code;
      return pRequest->code;
    } else {
      tsem_wait(&schdRspSem);
L
Liu Jicong 已提交
429

D
dapan1121 已提交
430 431 432 433 434 435 436 437 438 439 440 441
      if (res.code) {
        code = res.code;
      } else {
        break;
      }
    }
  }

  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
    pRequest->body.resInfo.numOfRows = res.numOfRows;

    if (pRequest->body.queryJob != 0) {
D
dapan1121 已提交
442
      schedulerFreeJob(pRequest->body.queryJob, 0);
D
dapan1121 已提交
443 444 445 446 447 448 449 450
    }
  }

  pRequest->code = res.code;
  terrno = res.code;
  return pRequest->code;
}

D
dapan1121 已提交
451
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
452
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
453

X
Xiaoyu Wang 已提交
454 455
  SQueryResult     res = {0};
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
456
                           .requestId = pRequest->requestId,
X
Xiaoyu Wang 已提交
457 458 459 460 461 462 463
                           .requestObjRefId = pRequest->self};
  SSchedulerReq    req = {.pConn = &conn,
                          .pNodeList = pNodeList,
                          .pDag = pDag,
                          .sql = pRequest->sqlstr,
                          .startTs = pRequest->metric.start,
                          .fp = NULL,
D
dapan1121 已提交
464 465
                          .cbParam = NULL,
                          .reqKilled = &pRequest->killed};
X
Xiaoyu Wang 已提交
466 467

  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
D
dapan1121 已提交
468
  pRequest->body.resInfo.execRes = res.res;
D
dapan1121 已提交
469

D
dapan1121 已提交
470
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
471
    if (pRequest->body.queryJob != 0) {
D
dapan1121 已提交
472
      schedulerFreeJob(pRequest->body.queryJob, 0);
473 474
    }

D
dapan1121 已提交
475
    pRequest->code = code;
D
dapan1121 已提交
476
    terrno = code;
H
Haojun Liao 已提交
477
    return pRequest->code;
X
Xiaoyu Wang 已提交
478
  }
479

480 481
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
      TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
482
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
483

D
dapan1121 已提交
484
    if (pRequest->body.queryJob != 0) {
D
dapan1121 已提交
485
      schedulerFreeJob(pRequest->body.queryJob, 0);
D
dapan1121 已提交
486 487
    }
  }
D
dapan1121 已提交
488

D
dapan1121 已提交
489
  pRequest->code = res.code;
L
Liu Jicong 已提交
490
  terrno = res.code;
D
dapan1121 已提交
491
  return pRequest->code;
X
Xiaoyu Wang 已提交
492
}
X
Xiaoyu Wang 已提交
493

494 495 496
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
  int32_t     code = 0;
  SArray*     pArray = NULL;
D
dapan1121 已提交
497 498 499 500
  SSubmitRsp* pRsp = (SSubmitRsp*)res;
  if (pRsp->nBlocks <= 0) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
501

D
dapan1121 已提交
502 503 504 505 506
  pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
  if (NULL == pArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
507

D
dapan1121 已提交
508 509 510 511
  for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
    SSubmitBlkRsp* blk = pRsp->pBlocks + i;
    if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
      continue;
D
dapan1121 已提交
512
    }
dengyihao's avatar
dengyihao 已提交
513

D
dapan1121 已提交
514 515 516
    STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
    taosArrayPush(pArray, &tbSver);
  }
dengyihao's avatar
dengyihao 已提交
517

X
Xiaoyu Wang 已提交
518
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
519 520 521 522 523
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = *epset};

  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
524

D
dapan1121 已提交
525
_return:
526

D
dapan1121 已提交
527
  taosArrayDestroy(pArray);
528
  return code;
D
dapan1121 已提交
529
}
D
dapan1121 已提交
530

531
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
D
dapan1121 已提交
532 533 534 535 536 537 538
  int32_t code = 0;
  SArray* pArray = NULL;
  SArray* pTbArray = (SArray*)res;
  int32_t tbNum = taosArrayGetSize(pTbArray);
  if (tbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
539

D
dapan1121 已提交
540 541 542 543 544
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
  if (NULL == pArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
545

D
dapan1121 已提交
546 547 548 549
  for (int32_t i = 0; i < tbNum; ++i) {
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
    taosArrayPush(pArray, &tbSver);
D
dapan1121 已提交
550 551
  }

X
Xiaoyu Wang 已提交
552
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
553 554 555 556 557
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = *epset};

  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
D
dapan1121 已提交
558 559 560 561 562

_return:

  taosArrayDestroy(pArray);
  return code;
D
dapan1121 已提交
563
}
D
dapan1121 已提交
564

D
dapan1121 已提交
565 566
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
D
dapan1121 已提交
567 568
}

H
Haojun Liao 已提交
569
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
D
dapan1121 已提交
570 571 572
  if (NULL == pRequest->body.resInfo.execRes.res) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
573

574 575
  SCatalog*     pCatalog = NULL;
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
H
Haojun Liao 已提交
576 577

  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
578 579
  if (code) {
    return code;
D
dapan1121 已提交
580
  }
581

582
  SEpSet         epset = getEpSet_s(&pAppInfo->mgmtEp);
D
dapan1121 已提交
583
  SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
584

D
dapan1121 已提交
585
  switch (pRes->msgType) {
586
    case TDMT_VND_ALTER_TABLE:
D
dapan1121 已提交
587
    case TDMT_MND_ALTER_STB: {
D
dapan1121 已提交
588
      code = handleAlterTbExecRes(pRes->res, pCatalog);
D
dapan1121 已提交
589 590 591
      break;
    }
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
592
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
D
dapan1121 已提交
593
      break;
594
    }
D
dapan1121 已提交
595
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
596
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
D
dapan1121 已提交
597 598 599
      break;
    }
    default:
600 601
      tscError("0x%" PRIx64 ", invalid exec result for request type %d, reqId:0x%" PRIx64, pRequest->self,
               pRequest->type, pRequest->requestId);
H
Haojun Liao 已提交
602
      code = TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
603
  }
D
dapan1121 已提交
604 605

  return code;
D
dapan1121 已提交
606
}
D
dapan1121 已提交
607

608
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
609
  SRequestObj* pRequest = (SRequestObj*)param;
610 611
  pRequest->code = code;

D
dapan1121 已提交
612 613 614 615 616 617 618 619 620 621
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
      TDMT_VND_CREATE_TABLE == pRequest->type) {
    pRequest->body.resInfo.numOfRows = pResult->numOfRows;

    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob, 0);
      pRequest->body.queryJob = 0;
    }
  }

D
dapan1121 已提交
622 623 624
  tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64,
             pRequest->self, code, tstrerror(code), pRequest->requestId);

625 626
  STscObj* pTscObj = pRequest->pTscObj;
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) {
627 628
    tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
             pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
629 630 631 632 633
    pRequest->prevCode = code;
    doAsyncQuery(pRequest, true);
    return;
  }

H
Haojun Liao 已提交
634 635
  if (code == TSDB_CODE_SUCCESS) {
    code = handleQueryExecRsp(pRequest);
636
    ASSERT(pRequest->code == TSDB_CODE_SUCCESS);
H
Haojun Liao 已提交
637 638 639
    pRequest->code = code;
  }

D
dapan1121 已提交
640
  tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
641 642 643
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
    removeMeta(pTscObj, pRequest->tableList);
  }
644 645 646 647 648

  // return to client
  pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}

H
Haojun Liao 已提交
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
  int32_t code = 0;

  switch (pQuery->execMode) {
    case QUERY_EXEC_MODE_LOCAL:
      code = execLocalCmd(pRequest, pQuery);
      break;
    case QUERY_EXEC_MODE_RPC:
      code = execDdlQuery(pRequest, pQuery);
      break;
    case QUERY_EXEC_MODE_SCHEDULE: {
      SArray* pNodeList = NULL;
      code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList);
      if (TSDB_CODE_SUCCESS == code) {
        code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
X
Xiaoyu Wang 已提交
664
      }
H
Haojun Liao 已提交
665 666
      taosArrayDestroy(pNodeList);
      break;
X
Xiaoyu Wang 已提交
667
    }
H
Haojun Liao 已提交
668 669 670 671 672
    case QUERY_EXEC_MODE_EMPTY_RESULT:
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      break;
    default:
      break;
X
Xiaoyu Wang 已提交
673 674
  }

D
stmt  
dapan1121 已提交
675 676 677
  if (!keepQuery) {
    qDestroyQuery(pQuery);
  }
dengyihao's avatar
dengyihao 已提交
678

H
Haojun Liao 已提交
679
  handleQueryExecRsp(pRequest);
D
dapan1121 已提交
680

D
dapan1121 已提交
681
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
682
    pRequest->code = terrno;
D
dapan1121 已提交
683 684 685
  }

  if (res) {
D
dapan1121 已提交
686 687
    *res = pRequest->body.resInfo.execRes.res;
    pRequest->body.resInfo.execRes.res = NULL;
X
Xiaoyu Wang 已提交
688
  }
689

690 691 692
  return pRequest;
}

D
stmt  
dapan1121 已提交
693 694 695 696 697
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  SQuery*      pQuery = NULL;

  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
698 699 700 701 702 703 704
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  code = parseSql(pRequest, false, &pQuery, NULL);
  if (code != TSDB_CODE_SUCCESS) {
705 706
    pRequest->code = code;
    return pRequest;
D
stmt  
dapan1121 已提交
707 708
  }

709 710
  pRequest->stableQuery = pQuery->stableQuery;

711 712 713 714 715
  return launchQueryImpl(pRequest, pQuery, false, NULL);
}

void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
  int32_t code = 0;
716

717 718
  switch (pQuery->execMode) {
    case QUERY_EXEC_MODE_LOCAL:
719 720
      asyncExecLocalCmd(pRequest, pQuery);
      return;
721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
    case QUERY_EXEC_MODE_RPC:
      code = asyncExecDdlQuery(pRequest, pQuery);
      break;
    case QUERY_EXEC_MODE_SCHEDULE: {
      SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));

      pRequest->type = pQuery->msgType;

      SPlanContext cxt = {.queryId = pRequest->requestId,
                          .acctId = pRequest->pTscObj->acctId,
                          .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                          .pAstRoot = pQuery->pRoot,
                          .showRewrite = pQuery->showRewrite,
                          .pMsg = pRequest->msgBuf,
                          .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};

      SAppInstInfo* pAppInfo = getAppInfo(pRequest);
      if (TSDB_CODE_SUCCESS == code) {
        code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
D
dapan1121 已提交
740
        if (code) {
X
Xiaoyu Wang 已提交
741 742
          tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
                   pRequest->requestId);
D
dapan1121 已提交
743
        }
744 745 746
      }

      if (TSDB_CODE_SUCCESS == code) {
X
Xiaoyu Wang 已提交
747 748
        SRequestConnInfo conn = {
            .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
D
dapan1121 已提交
749 750 751 752 753 754
        SSchedulerReq req = {.pConn = &conn,
                             .pNodeList = pNodeList,
                             .pDag = pRequest->body.pDag,
                             .sql = pRequest->sqlstr,
                             .startTs = pRequest->metric.start,
                             .fp = schedulerExecCb,
D
dapan1121 已提交
755 756
                             .cbParam = pRequest,
                             .reqKilled = &pRequest->killed};
D
dapan1121 已提交
757
        code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
D
dapan1121 已提交
758
      } else {
759 760
        tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
                 pRequest->requestId);
D
dapan1121 已提交
761
        pRequest->body.queryFp(pRequest->body.param, pRequest, code);
762 763
      }

764
      // todo not to be released here
765 766 767 768
      taosArrayDestroy(pNodeList);
      break;
    }
    case QUERY_EXEC_MODE_EMPTY_RESULT:
769
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
D
fix bug  
dapan1121 已提交
770
      pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
771 772 773 774 775 776 777 778 779 780 781 782
      break;
    default:
      break;
  }

  //    if (!keepQuery) {
  //      qDestroyQuery(pQuery);
  //    }

  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
    pRequest->code = terrno;
  }
D
stmt  
dapan1121 已提交
783 784
}

D
dapan1121 已提交
785
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
786 787 788 789
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
790 791 792 793

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
794

D
dapan1121 已提交
795 796 797 798 799
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
800
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
801 802 803
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
D
dapan1121 已提交
804 805

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
806 807
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
808
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
D
dapan1121 已提交
809 810
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
811 812 813
    }
  }

D
dapan1121 已提交
814
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
815
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
816

D
dapan1121 已提交
817
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
D
dapan1121 已提交
818 819 820
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
821 822
  }

D
dapan1121 已提交
823
  return code;
D
dapan1121 已提交
824 825
}

D
dapan1121 已提交
826 827
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
  SCatalog* pCatalog = NULL;
828 829
  int32_t   tbNum = taosArrayGetSize(tbList);
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
830 831 832
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
833

D
dapan1121 已提交
834 835 836 837 838 839 840 841
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pTbName = taosArrayGet(tbList, i);
    catalogRemoveTableMeta(pCatalog, pTbName);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
842 843
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
844 845
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
846

847 848
  do {
    destroyRequest(pRequest);
D
stmt  
dapan1121 已提交
849
    pRequest = launchQuery(pTscObj, sql, sqlLen);
850
    if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
851 852 853
      break;
    }

D
dapan1121 已提交
854 855 856
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
857 858
      break;
    }
859
  } while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
L
Liu Jicong 已提交
860

D
dapan1121 已提交
861 862 863
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
    removeMeta(pTscObj, pRequest->tableList);
  }
864

D
dapan1121 已提交
865 866 867
  return pRequest;
}

S
Shengliang Guan 已提交
868 869
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
870

H
Haojun Liao 已提交
871
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
872
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
873
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
874
  mgmtEpSet->inUse = 0;
875

S
Shengliang Guan 已提交
876 877 878 879
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
880
    }
S
Shengliang Guan 已提交
881

882 883 884 885 886 887
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

S
Shengliang Guan 已提交
888 889 890 891 892 893 894
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
895
    }
S
Shengliang Guan 已提交
896 897 898

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
899 900 901 902 903 904 905 906 907 908
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
909
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
910
                         SAppInstInfo* pAppInfo, int connType) {
911
  STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
912 913 914 915 916
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

917
  SRequestObj* pRequest = createRequest(pTscObj, TDMT_MND_CONNECT);
918 919 920
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
921 922 923
    return NULL;
  }

924
  SMsgSendInfo* body = buildConnectMsg(pRequest);
925 926

  int64_t transporterId = 0;
H
Haojun Liao 已提交
927
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
928 929 930

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
931 932
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
933
    fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
934

935
    terrno = pRequest->code;
936
    destroyRequest(pRequest);
937
    taos_close_internal(pTscObj);
938 939
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
940
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
941
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
942 943 944 945 946 947
    destroyRequest(pRequest);
  }

  return pTscObj;
}

948
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
949
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
950 951 952 953 954
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
955
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
956

957
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
958
  pMsgSendInfo->requestId = pRequest->requestId;
H
Haojun Liao 已提交
959
  pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType);
dengyihao's avatar
dengyihao 已提交
960
  pMsgSendInfo->param = pRequest;
961

S
Shengliang Guan 已提交
962 963
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
964

H
Haojun Liao 已提交
965
  char* db = getDbOfConnection(pObj);
966
  if (db != NULL) {
S
Shengliang Guan 已提交
967
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
968
  }
wafwerar's avatar
wafwerar 已提交
969
  taosMemoryFreeClear(db);
970

X
Xiaoyu Wang 已提交
971
  connectReq.connType = pObj->connType;
972 973
  connectReq.pid = appInfo.pid;
  connectReq.startTime = appInfo.startTime;
974

S
Shengliang Guan 已提交
975
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
dengyihao's avatar
dengyihao 已提交
976 977
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
S
Shengliang Guan 已提交
978 979

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
980
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
981
  tSerializeSConnectReq(pReq, contLen, &connectReq);
982

S
Shengliang Guan 已提交
983 984
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
985
  return pMsgSendInfo;
986 987
}

988
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
989
  assert(pMsgBody != NULL);
D
dapan1121 已提交
990
  taosMemoryFreeClear(pMsgBody->target.dbFName);
wafwerar's avatar
wafwerar 已提交
991 992
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
993
}
994

D
dapan1121 已提交
995 996 997 998
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
  if (NULL == pEpSet) {
    return;
  }
L
Liu Jicong 已提交
999

D
dapan1121 已提交
1000 1001 1002
  switch (pSendInfo->target.type) {
    case TARGET_TYPE_MNODE:
      if (NULL == pTscObj) {
X
Xiaoyu Wang 已提交
1003 1004
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
D
dapan1121 已提交
1005 1006 1007
        return;
      }

D
dapan1121 已提交
1008
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
X
Xiaoyu Wang 已提交
1009 1010 1011 1012
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
L
Liu Jicong 已提交
1013
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
D
dapan1121 已提交
1014 1015 1016
      break;
    case TARGET_TYPE_VNODE: {
      if (NULL == pTscObj) {
X
Xiaoyu Wang 已提交
1017 1018
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
D
dapan1121 已提交
1019 1020 1021 1022
        return;
      }

      SCatalog* pCatalog = NULL;
L
Liu Jicong 已提交
1023
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
1024
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1025 1026
        tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
                 tstrerror(code));
D
dapan1121 已提交
1027 1028
        return;
      }
L
Liu Jicong 已提交
1029

D
dapan1121 已提交
1030
      catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
D
dapan1121 已提交
1031
      taosMemoryFreeClear(pSendInfo->target.dbFName);
D
dapan1121 已提交
1032 1033 1034 1035 1036 1037 1038 1039
      break;
    }
    default:
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
      break;
  }
}

1040
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
1041 1042
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
D
dapan1121 已提交
1043
  STscObj* pTscObj = NULL;
1044

D
dapan1121 已提交
1045 1046
  tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code));

1047
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
1048
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
D
dapan1121 已提交
1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065
    if (pRequest) {
      assert(pRequest->self == pSendInfo->requestObjRefId);

      pRequest->metric.rsp = taosGetTimestampUs();
      pTscObj = pRequest->pTscObj;
      /*
       * There is not response callback function for submit response.
       * The actual inserted number of points is the first number.
       */
      int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
      if (pMsg->code == TSDB_CODE_SUCCESS) {
        tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
                 TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
      } else {
        tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
                 TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
      }
1066

D
dapan1121 已提交
1067 1068
      taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
    }
1069 1070
  }

D
dapan1121 已提交
1071 1072
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);

S
Shengliang Guan 已提交
1073
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
1074 1075

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
1076
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1077 1078 1079 1080 1081 1082
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
1083 1084
  }

1085
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1086
  rpcFreeCont(pMsg->pCont);
1087
  destroySendMsgInfo(pSendInfo);
1088
}
1089

dengyihao's avatar
dengyihao 已提交
1090
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
1091 1092 1093 1094 1095 1096 1097 1098 1099 1100
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

1101 1102
  STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
  if (pObj) {
D
dapan1121 已提交
1103 1104 1105
    uint64_t *id = taosMemoryMalloc(sizeof(uint64_t));
    *id = pObj->id;
    return (TAOS*)id;
1106 1107
  }
  
D
dapan1121 已提交
1108
  return NULL;
1109 1110
}

dengyihao's avatar
dengyihao 已提交
1111 1112 1113
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
1114
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
1115 1116
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
1117

dengyihao's avatar
dengyihao 已提交
1118
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
1119 1120
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
1121
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1122
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
1123 1124
}

L
Liu Jicong 已提交
1125
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
1126 1127 1128 1129 1130 1131 1132
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
1133
      if (!IS_VAR_NULL_TYPE(type, bytes) && pCol->offset[pResultInfo->current] != -1) {
1134 1135 1136 1137 1138 1139
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
1140
        pResultInfo->length[i] = 0;
1141 1142 1143 1144
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
1145
        pResultInfo->length[i] = bytes;
1146 1147
      } else {
        pResultInfo->row[i] = NULL;
1148
        pResultInfo->length[i] = 0;
1149 1150 1151 1152 1153
      }
    }
  }
}

1154
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
D
dapan1121 已提交
1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165
  assert(pRequest != NULL);

  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
    // All data has returned to App already, no need to try again
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
1166
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
D
dapan1121 已提交
1167 1168 1169 1170 1171
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

L
Liu Jicong 已提交
1172 1173
    pRequest->code =
        setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
D
dapan1121 已提交
1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

    if (pResultInfo->numOfRows == 0) {
      return NULL;
    }
  }

  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
  }

  return pResultInfo->row;
}

H
Haojun Liao 已提交
1195 1196 1197
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
  SSyncQueryParam* pParam = param;
  tsem_post(&pParam->sem);
1198
}
D
dapan1121 已提交
1199

1200
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1201
  assert(pRequest != NULL);
H
Haojun Liao 已提交
1202

H
Haojun Liao 已提交
1203
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
1204
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
1205
    // All data has returned to App already, no need to try again
1206 1207 1208 1209 1210
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

1211
    SSyncQueryParam* pParam = pRequest->body.param;
1212 1213 1214 1215

    // convert ucs4 to native multi-bytes string
    pResultInfo->convertUcs4 = convertUcs4;

1216
    taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
1217
    tsem_wait(&pParam->sem);
H
Haojun Liao 已提交
1218 1219
  }

H
Haojun Liao 已提交
1220
  if (pRequest->code == TSDB_CODE_SUCCESS && pResultInfo->numOfRows > 0 && setupOneRowPtr) {
1221 1222
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
1223 1224 1225 1226 1227
  }

  return pResultInfo->row;
}

1228
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
1229
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
1230 1231
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
1232 1233
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1234

1235 1236 1237
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1238
  }
1239 1240

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1241 1242
}

1243 1244 1245 1246
static char* parseTagDatatoJson(void* p) {
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
wmmhello's avatar
wmmhello 已提交
1247 1248 1249
    goto end;
  }

C
Cary Xu 已提交
1250 1251 1252 1253 1254 1255
  SArray* pTagVals = NULL;
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    goto end;
  }

  int16_t nCols = taosArrayGetSize(pTagVals);
1256
  char    tagJsonKey[256] = {0};
wmmhello's avatar
wmmhello 已提交
1257
  for (int j = 0; j < nCols; ++j) {
C
Cary Xu 已提交
1258
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
wmmhello's avatar
wmmhello 已提交
1259 1260
    // json key  encode by binary
    memset(tagJsonKey, 0, sizeof(tagJsonKey));
wmmhello's avatar
wmmhello 已提交
1261
    memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
wmmhello's avatar
wmmhello 已提交
1262
    // json value
L
Liu Jicong 已提交
1263
    char type = pTagVal->type;
1264
    if (type == TSDB_DATA_TYPE_NULL) {
wmmhello's avatar
wmmhello 已提交
1265
      cJSON* value = cJSON_CreateNull();
1266
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1267 1268 1269
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1270
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1271
      cJSON* value = NULL;
wmmhello's avatar
wmmhello 已提交
1272 1273 1274
      if (pTagVal->nData > 0) {
        char*   tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
wmmhello's avatar
wmmhello 已提交
1275
        if (length < 0) {
L
Liu Jicong 已提交
1276 1277
          tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                   pTagVal->pData);
wmmhello's avatar
wmmhello 已提交
1278 1279 1280 1281 1282
          taosMemoryFree(tagJsonValue);
          goto end;
        }
        value = cJSON_CreateString(tagJsonValue);
        taosMemoryFree(tagJsonValue);
1283
        if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1284 1285
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
1286
      } else if (pTagVal->nData == 0) {
wmmhello's avatar
wmmhello 已提交
1287
        value = cJSON_CreateString("");
1288
      } else {
wmmhello's avatar
wmmhello 已提交
1289 1290 1291 1292
        ASSERT(0);
      }

      cJSON_AddItemToObject(json, tagJsonKey, value);
1293
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
wmmhello's avatar
wmmhello 已提交
1294
      double jsonVd = *(double*)(&pTagVal->i64);
wmmhello's avatar
wmmhello 已提交
1295
      cJSON* value = cJSON_CreateNumber(jsonVd);
1296
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1297 1298 1299
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1300
    } else if (type == TSDB_DATA_TYPE_BOOL) {
wmmhello's avatar
wmmhello 已提交
1301
      char   jsonVd = *(char*)(&pTagVal->i64);
wmmhello's avatar
wmmhello 已提交
1302
      cJSON* value = cJSON_CreateBool(jsonVd);
1303
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1304 1305 1306
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1307
    } else {
wmmhello's avatar
wmmhello 已提交
1308
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
1309 1310 1311 1312 1313 1314 1315 1316
    }
  }
  string = cJSON_PrintUnformatted(json);
end:
  cJSON_Delete(json);
  return string;
}

1317 1318 1319 1320 1321
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

1322
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);
D
stmt  
dapan1121 已提交
1337
          ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
dengyihao's avatar
dengyihao 已提交
1338

1339 1340 1341 1342 1343 1344 1345 1346
          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1347
    } else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
1348 1349 1350 1351 1352 1353
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;
dengyihao's avatar
dengyihao 已提交
1354
      int32_t        len = 0;
1355 1356 1357 1358 1359 1360
      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t jsonInnerType = *pStart;
dengyihao's avatar
dengyihao 已提交
1361 1362 1363
          char*   jsonInnerData = pStart + CHAR_BYTES;
          char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
          if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1364 1365
            sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
            varDataSetLen(dst, strlen(varDataVal(dst)));
wmmhello's avatar
wmmhello 已提交
1366
          } else if (jsonInnerType == TD_TAG_JSON) {
wmmhello's avatar
wmmhello 已提交
1367
            char* jsonString = parseTagDatatoJson(pStart);
wmmhello's avatar
wmmhello 已提交
1368 1369
            STR_TO_VARSTR(dst, jsonString);
            taosMemoryFree(jsonString);
dengyihao's avatar
dengyihao 已提交
1370
          } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1371
            *(char*)varDataVal(dst) = '\"';
dengyihao's avatar
dengyihao 已提交
1372 1373
            int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
                                           varDataVal(dst) + CHAR_BYTES);
1374
            if (length <= 0) {
1375
              tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset);
1376 1377
              length = 0;
            }
dengyihao's avatar
dengyihao 已提交
1378
            varDataSetLen(dst, length + CHAR_BYTES * 2);
wmmhello's avatar
wmmhello 已提交
1379
            *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
dengyihao's avatar
dengyihao 已提交
1380
          } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
1381 1382 1383
            double jsonVd = *(double*)(jsonInnerData);
            sprintf(varDataVal(dst), "%.9lf", jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1384 1385
          } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
            sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
1386
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1387
          } else {
1388 1389 1390
            ASSERT(0);
          }

dengyihao's avatar
dengyihao 已提交
1391
          if (len + varDataTLen(dst) > colLength[i]) {
1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408
            p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
            if (p == NULL) {
              return TSDB_CODE_OUT_OF_MEMORY;
            }

            pResultInfo->convertBuf[i] = p;
          }
          p = pResultInfo->convertBuf[i] + len;
          memcpy(p, dst, varDataTLen(dst));
          pCol->offset[j] = len;
          len += varDataTLen(dst);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
1409 1410 1411 1412 1413
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1414 1415
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
1416 1417
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
1418
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1419 1420
  }

1421 1422 1423 1424
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1425

L
Liu Jicong 已提交
1426
  char* p = (char*)pResultInfo->pData;
1427

L
Liu Jicong 已提交
1428
  int32_t dataLen = *(int32_t*)p;
1429 1430
  p += sizeof(int32_t);

L
Liu Jicong 已提交
1431
  uint64_t groupId = *(uint64_t*)p;
1432 1433
  p += sizeof(uint64_t);

1434
  // check fields
1435 1436
  for (int32_t i = 0; i < numOfCols; ++i) {
    int16_t type = *(int16_t*)p;
1437 1438
    p += sizeof(int16_t);

1439
    int32_t bytes = *(int32_t*)p;
1440 1441
    p += sizeof(int32_t);

L
Liu Jicong 已提交
1442
    /*ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);*/
1443 1444
  }

1445 1446 1447 1448
  int32_t* colLength = (int32_t*)p;
  p += sizeof(int32_t) * numOfCols;

  char* pStart = p;
H
Haojun Liao 已提交
1449
  for (int32_t i = 0; i < numOfCols; ++i) {
1450
    colLength[i] = htonl(colLength[i]);
1451
    ASSERT(colLength[i] < dataLen);
1452 1453 1454 1455 1456 1457 1458 1459 1460 1461

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
1462
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
1463 1464 1465
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
1466
  }
1467

H
Haojun Liao 已提交
1468
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
1469 1470
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
1471 1472
  }

1473
  return code;
S
Shengliang Guan 已提交
1474 1475
}

H
Haojun Liao 已提交
1476
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
1477
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
1478
  taosThreadMutexLock(&pObj->mutex);
1479 1480 1481 1482
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
1483

wafwerar's avatar
wafwerar 已提交
1484
  taosThreadMutexUnlock(&pObj->mutex);
1485 1486 1487 1488 1489
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
1490
  taosThreadMutexLock(&pTscObj->mutex);
1491
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
1492
  taosThreadMutexUnlock(&pTscObj->mutex);
1493
}
S
Shengliang Guan 已提交
1494

H
Haojun Liao 已提交
1495 1496 1497 1498 1499 1500 1501 1502 1503 1504
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

L
Liu Jicong 已提交
1505 1506
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
                              bool freeAfterUse) {
1507 1508
  assert(pResultInfo != NULL && pRsp != NULL);

L
Liu Jicong 已提交
1509 1510
  if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);

L
Liu Jicong 已提交
1511 1512 1513 1514 1515
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
1516
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
1517
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
1518

1519
  // TODO handle the compressed case
1520
  pResultInfo->totalRows += pResultInfo->numOfRows;
1521 1522
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
1523
}
S
Shengliang Guan 已提交
1524 1525 1526 1527 1528 1529

TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
  void*              clientRpc = NULL;
  SServerStatusRsp   statusRsp = {0};
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
S
Shengliang Guan 已提交
1530
  SRpcMsg            rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
S
Shengliang Guan 已提交
1531 1532 1533 1534 1535 1536 1537 1538 1539 1540 1541 1542 1543 1544 1545 1546 1547 1548
  SRpcMsg            rpcRsp = {0};
  SRpcInit           rpcInit = {0};
  char               pass[TSDB_PASSWORD_LEN + 1] = {0};

  rpcInit.label = "CHK";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "_dnd";

  clientRpc = rpcOpen(&rpcInit);
  if (clientRpc == NULL) {
    tscError("failed to init server status client");
    goto _OVER;
  }

S
Shengliang Guan 已提交
1549 1550 1551 1552 1553 1554 1555 1556
  if (fqdn == NULL) {
    fqdn = tsLocalFqdn;
  }

  if (port == 0) {
    port = tsServerPort;
  }

S
Shengliang Guan 已提交
1557 1558 1559 1560 1561 1562 1563 1564 1565 1566 1567 1568 1569 1570 1571
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
  epSet.eps[0].port = (uint16_t)port;
  rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
    tscError("failed to send server status req since %s", terrstr());
    goto _OVER;
  }

  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
    tscError("failed to parse server status rsp since %s", terrstr());
    goto _OVER;
  }

  code = statusRsp.statusCode;
1572
  if (details != NULL) {
S
Shengliang Guan 已提交
1573 1574 1575 1576 1577 1578 1579 1580 1581 1582 1583 1584
    tstrncpy(details, statusRsp.details, maxlen);
  }

_OVER:
  if (clientRpc != NULL) {
    rpcClose(clientRpc);
  }
  if (rpcRsp.pCont != NULL) {
    rpcFreeCont(rpcRsp.pCont);
  }
  return code;
}