clientImpl.c 68.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "command.h"
20
#include "scheduler.h"
H
Haojun Liao 已提交
21
#include "tdatablock.h"
22
#include "tdataformat.h"
23
#include "tdef.h"
24
#include "tglobal.h"
25
#include "tmsgtype.h"
H
Haojun Liao 已提交
26
#include "tpagedbuf.h"
27
#include "tref.h"
dengyihao's avatar
dengyihao 已提交
28
#include "tsched.h"
X
Xiaoyu Wang 已提交
29

S
Shengliang Guan 已提交
30
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
31
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
H
Haojun Liao 已提交
32

33
static bool stringLengthCheck(const char* str, size_t maxsize) {
34 35 36 37 38 39 40 41 42 43 44 45
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
46
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
47

dengyihao's avatar
dengyihao 已提交
48
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
49

dengyihao's avatar
dengyihao 已提交
50
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
51

52 53 54 55 56 57
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

D
dapan1121 已提交
58
bool chkRequestKilled(void* param) {
dengyihao's avatar
dengyihao 已提交
59
  bool         killed = false;
D
dapan1121 已提交
60 61 62 63 64 65
  SRequestObj* pRequest = acquireRequest((int64_t)param);
  if (NULL == pRequest || pRequest->killed) {
    killed = true;
  }

  releaseRequest((int64_t)param);
dengyihao's avatar
dengyihao 已提交
66

D
dapan1121 已提交
67 68 69
  return killed;
}

dengyihao's avatar
dengyihao 已提交
70
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
71
                                SAppInstInfo* pAppInfo, int connType);
72

73
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
dengyihao's avatar
dengyihao 已提交
74
                               uint16_t port, int connType) {
75 76 77 78
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

79 80 81 82 83
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

84
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
85
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
86
    if (!validateDbName(db)) {
87 88 89 90
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

91 92
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
93 94
  }

95
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
96 97 98 99 100 101
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
102
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
103 104 105 106
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

107
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
108 109 110 111 112 113 114 115 116
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
117

118 119
  if (port) {
    epSet.epSet.eps[0].port = port;
S
Shengliang Guan 已提交
120
    epSet.epSet.eps[1].port = port;
121 122
  }

123
  char* key = getClusterKey(user, secretEncrypt, ip, port);
124

125
  SAppInstInfo** pInst = NULL;
wafwerar's avatar
wafwerar 已提交
126
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
127 128

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
129
  SAppInstInfo* p = NULL;
130
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
131
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
L
Liu Jicong 已提交
132
    p->mgmtEp = epSet;
D
dapan1121 已提交
133
    taosThreadMutexInit(&p->qnodeMutex, NULL);
H
Haojun Liao 已提交
134
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
135
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
136
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
D
dapan1121 已提交
137 138 139
    p->instKey = key;
    key = NULL;
    tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, ip, port);
dengyihao's avatar
dengyihao 已提交
140

H
Haojun Liao 已提交
141
    pInst = &p;
142 143
  }

wafwerar's avatar
wafwerar 已提交
144
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
145

wafwerar's avatar
wafwerar 已提交
146
  taosMemoryFreeClear(key);
L
Liu Jicong 已提交
147
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
148 149
}

X
Xiaoyu Wang 已提交
150 151
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
                     SRequestObj** pRequest) {
152
  *pRequest = createRequest(connId, TSDB_SQL_SELECT);
X
Xiaoyu Wang 已提交
153
  if (*pRequest == NULL) {
154
    tscError("failed to malloc sqlObj, %s", sql);
D
dapan1121 已提交
155
    return terrno;
156 157
  }

wafwerar's avatar
wafwerar 已提交
158
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
159
  if ((*pRequest)->sqlstr == NULL) {
160 161 162
    tscError("0x%" PRIx64 " failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
    destroyRequest(*pRequest);
    *pRequest = NULL;
X
Xiaoyu Wang 已提交
163
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
164 165
  }

X
Xiaoyu Wang 已提交
166 167
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
X
Xiaoyu Wang 已提交
168
  (*pRequest)->sqlLen = sqlLen;
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184
  (*pRequest)->validateOnly = validateSql;

  if (param == NULL) {
    SSyncQueryParam* pParam = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
    if (pParam == NULL) {
      destroyRequest(*pRequest);
      *pRequest = NULL;
      return TSDB_CODE_TSC_OUT_OF_MEMORY;
    }

    tsem_init(&pParam->sem, 0, 0);
    pParam->pRequest = (*pRequest);
    param = pParam;
  }

  (*pRequest)->body.param = param;
185

186
  STscObj* pTscObj = (*pRequest)->pTscObj;
dengyihao's avatar
dengyihao 已提交
187 188
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
                  sizeof((*pRequest)->self))) {
189 190 191
    tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
             (*pRequest)->requestId, pTscObj->id, sql);

D
dapan1121 已提交
192 193 194 195 196
    destroyRequest(*pRequest);
    *pRequest = NULL;
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
197
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
198 199
  return TSDB_CODE_SUCCESS;
}
200

D
stmt  
dapan1121 已提交
201
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
202 203
  STscObj* pTscObj = pRequest->pTscObj;

204
  SParseContext cxt = {.requestId = pRequest->requestId,
D
dapan1121 已提交
205
                       .requestRid = pRequest->self,
206 207 208 209 210 211 212 213 214
                       .acctId = pTscObj->acctId,
                       .db = pRequest->pDb,
                       .topicQuery = topicQuery,
                       .pSql = pRequest->sqlstr,
                       .sqlLen = pRequest->sqlLen,
                       .pMsg = pRequest->msgBuf,
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
                       .pStmtCb = pStmtCb,
215
                       .pUser = pTscObj->user,
wmmhello's avatar
wmmhello 已提交
216
                       .schemalessType = pTscObj->schemalessType,
D
dapan1121 已提交
217
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
218
                       .enableSysInfo = pTscObj->sysInfo,
D
dapan1121 已提交
219 220
                       .svrVer = pTscObj->sVer,
                       .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes)};
H
Haojun Liao 已提交
221

H
Haojun Liao 已提交
222 223
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
224 225 226 227
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
228
  code = qParseSql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
229 230 231
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
232
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
X
Xiaoyu Wang 已提交
233
    }
234
  }
235

236
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
wafwerar's avatar
wafwerar 已提交
237 238
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
D
dapan1121 已提交
239
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
X
Xiaoyu Wang 已提交
240
  }
241

D
dapan1121 已提交
242 243 244
  taosArrayDestroy(cxt.pTableMetaPos);
  taosArrayDestroy(cxt.pTableVgroupPos);

X
Xiaoyu Wang 已提交
245 246
  return code;
}
H
Haojun Liao 已提交
247

248 249
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
250
  int32_t            code = qExecCommand(pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp);
251
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
252
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
253
  }
254

255 256 257
  return code;
}

X
Xiaoyu Wang 已提交
258
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
259 260 261 262 263
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

264 265 266
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
267
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
268 269 270

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
271

272 273
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
274

275
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
276 277
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
278

279
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
280

281 282
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
D
dapan1121 已提交
283 284 285 286
  if (pRequest->validateOnly) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
    return;
  }
287

288
  int32_t code = qExecCommand(pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp);
289
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
D
dapan1121 已提交
290
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
291 292 293
  }

  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
D
dapan1121 已提交
294
  pRequest->code = code;
295 296 297 298 299 300 301 302 303 304 305

  if (pRequest->code != TSDB_CODE_SUCCESS) {
    pResultInfo->numOfRows = 0;
    tscError("0x%" PRIx64 " fetch results failed, code:%s, reqId:0x%" PRIx64, pRequest->self, tstrerror(code),
             pRequest->requestId);
  } else {
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
             pRequest->requestId);
  }

D
dapan1121 已提交
306
  pRequest->body.queryFp(pRequest->body.param, pRequest, code);
307 308
}

309
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
D
dapan1121 已提交
310 311 312 313 314
  if (pRequest->validateOnly) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
    return TSDB_CODE_SUCCESS;
  }

315 316
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
D
dapan1121 已提交
317
    pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
318 319 320 321 322 323 324 325 326 327 328 329
    return TSDB_CODE_SUCCESS;
  }

  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management

  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);

  int64_t transporterId = 0;
D
dapan1121 已提交
330 331 332 333 334
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
  if (code) {
    pRequest->body.queryFp(pRequest->body.param, pRequest, code);
  }
  return code;
335 336
}

D
dapan1121 已提交
337
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
L
Liu Jicong 已提交
338 339
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
D
dapan1121 已提交
340 341 342 343

  if (node1->load < node2->load) {
    return -1;
  }
L
Liu Jicong 已提交
344

D
dapan1121 已提交
345 346 347
  return node1->load > node2->load;
}

L
Liu Jicong 已提交
348
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
D
dapan1121 已提交
349 350 351 352
  taosThreadMutexLock(&pInfo->qnodeMutex);
  if (pInfo->pQnodeList) {
    taosArrayDestroy(pInfo->pQnodeList);
    pInfo->pQnodeList = NULL;
D
dapan1121 已提交
353
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
D
dapan1121 已提交
354
  }
L
Liu Jicong 已提交
355

D
dapan1121 已提交
356 357 358
  if (pNodeList) {
    pInfo->pQnodeList = taosArrayDup(pNodeList);
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
X
Xiaoyu Wang 已提交
359 360
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%d", pInfo->clusterId,
             taosArrayGetSize(pInfo->pQnodeList));
D
dapan1121 已提交
361 362 363 364 365 366
  }
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
367 368 369 370 371 372
bool qnodeRequired(SRequestObj* pRequest) {
  if (QUERY_POLICY_VNODE == tsQueryPolicy) {
    return false;
  }

  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
dengyihao's avatar
dengyihao 已提交
373 374
  bool          required = false;

D
dapan1121 已提交
375 376 377 378 379 380 381
  taosThreadMutexLock(&pInfo->qnodeMutex);
  required = (NULL == pInfo->pQnodeList);
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  return required;
}

D
dapan1121 已提交
382
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
L
Liu Jicong 已提交
383 384 385
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
  int32_t       code = 0;

D
dapan1121 已提交
386 387 388 389 390 391 392
  taosThreadMutexLock(&pInfo->qnodeMutex);
  if (pInfo->pQnodeList) {
    *pNodeList = taosArrayDup(pInfo->pQnodeList);
  }
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  if (NULL == *pNodeList) {
L
Liu Jicong 已提交
393
    SCatalog* pCatalog = NULL;
D
dapan1121 已提交
394 395 396
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (TSDB_CODE_SUCCESS == code) {
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
X
Xiaoyu Wang 已提交
397
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
398 399 400 401
                               .requestId = pRequest->requestId,
                               .requestObjRefId = pRequest->self,
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
D
dapan1121 已提交
402
    }
L
Liu Jicong 已提交
403

D
dapan1121 已提交
404 405 406 407 408 409 410 411
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
      code = updateQnodeList(pInfo, *pNodeList);
    }
  }

  return code;
}

D
dapan1121 已提交
412
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
413
  pRequest->type = pQuery->msgType;
414 415
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);

L
Liu Jicong 已提交
416 417
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
418
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
L
Liu Jicong 已提交
419 420 421
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite,
                      .pMsg = pRequest->msgBuf,
X
Xiaoyu Wang 已提交
422
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
423 424
                      .pUser = pRequest->pTscObj->user,
                      .sysInfo = pRequest->pTscObj->sysInfo};
D
dapan1121 已提交
425

D
dapan1121 已提交
426
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
427 428
}

429
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
430
  ASSERT(pSchema != NULL && numOfCols > 0);
431

432
  pResInfo->numOfCols = numOfCols;
L
Liu Jicong 已提交
433 434 435 436 437 438
  if (pResInfo->fields != NULL) {
    taosMemoryFree(pResInfo->fields);
  }
  if (pResInfo->userFields != NULL) {
    taosMemoryFree(pResInfo->userFields);
  }
439 440
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
441 442

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
443
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
444
    pResInfo->fields[i].type = pSchema[i].type;
445 446

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
447
    pResInfo->userFields[i].type = pSchema[i].type;
448 449 450

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
451
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
452 453 454
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

455
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
456
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
457
  }
X
Xiaoyu Wang 已提交
458 459
}

460
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
dengyihao's avatar
dengyihao 已提交
461 462
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
      precision != TSDB_TIME_PRECISION_NANO) {
463 464 465 466 467 468
    return;
  }

  pResInfo->precision = precision;
}

D
dapan1121 已提交
469 470 471 472 473 474 475 476 477 478 479 480
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));

  int32_t dbNum = taosArrayGetSize(pDbVgList);
  for (int32_t i = 0; i < dbNum; ++i) {
    SArray* pVg = taosArrayGetP(pDbVgList, i);
    int32_t vgNum = taosArrayGetSize(pVg);
    if (vgNum <= 0) {
      continue;
    }

    for (int32_t j = 0; j < vgNum; ++j) {
dengyihao's avatar
dengyihao 已提交
481
      SVgroupInfo*   pInfo = taosArrayGet(pVg, j);
D
dapan1121 已提交
482 483 484
      SQueryNodeLoad load = {0};
      load.addr.nodeId = pInfo->vgId;
      load.addr.epSet = pInfo->epSet;
dengyihao's avatar
dengyihao 已提交
485

D
dapan1121 已提交
486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542
      taosArrayPush(nodeList, &load);
    }
  }

  int32_t vnodeNum = taosArrayGetSize(nodeList);
  if (vnodeNum > 0) {
    tscDebug("0x%" PRIx64 " vnode policy, use vnode list, num:%d", pRequest->requestId, vnodeNum);
    goto _return;
  }

  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
  if (mnodeNum <= 0) {
    tscDebug("0x%" PRIx64 " vnode policy, empty node list", pRequest->requestId);
    goto _return;
  }

  void* pData = taosArrayGet(pMnodeList, 0);
  taosArrayAddBatch(nodeList, pData, mnodeNum);

  tscDebug("0x%" PRIx64 " vnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);

_return:

  *pNodeList = nodeList;

  return TSDB_CODE_SUCCESS;
}

int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));

  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
  if (qNodeNum > 0) {
    void* pData = taosArrayGet(pQnodeList, 0);
    taosArrayAddBatch(nodeList, pData, qNodeNum);
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
    goto _return;
  }

  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
  if (mnodeNum <= 0) {
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
    goto _return;
  }

  void* pData = taosArrayGet(pMnodeList, 0);
  taosArrayAddBatch(nodeList, pData, mnodeNum);

  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);

_return:

  *pNodeList = nodeList;

  return TSDB_CODE_SUCCESS;
}

dengyihao's avatar
dengyihao 已提交
543
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
D
dapan1121 已提交
544 545 546
  SArray* pDbVgList = NULL;
  SArray* pQnodeList = NULL;
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
547

D
dapan1121 已提交
548 549 550 551
  switch (tsQueryPolicy) {
    case QUERY_POLICY_VNODE: {
      if (pResultMeta) {
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
dengyihao's avatar
dengyihao 已提交
552

D
dapan1121 已提交
553 554 555 556 557 558 559 560
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
        for (int32_t i = 0; i < dbNum; ++i) {
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
          if (pRes->code || NULL == pRes->pRes) {
            continue;
          }

          taosArrayPush(pDbVgList, &pRes->pRes);
dengyihao's avatar
dengyihao 已提交
561
        }
D
dapan1121 已提交
562
      }
dengyihao's avatar
dengyihao 已提交
563

D
dapan1121 已提交
564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
      break;
    }
    case QUERY_POLICY_HYBRID:
    case QUERY_POLICY_QNODE: {
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
        if (pRes->code) {
          pQnodeList = NULL;
        } else {
          pQnodeList = taosArrayDup((SArray*)pRes->pRes);
        }
      } else {
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
        taosThreadMutexLock(&pInst->qnodeMutex);
        if (pInst->pQnodeList) {
          pQnodeList = taosArrayDup(pInst->pQnodeList);
        }
        taosThreadMutexUnlock(&pInst->qnodeMutex);
      }
dengyihao's avatar
dengyihao 已提交
584

D
dapan1121 已提交
585 586 587 588 589 590 591 592 593 594
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
      break;
    }
    default:
      tscError("unknown query policy: %d", tsQueryPolicy);
      return TSDB_CODE_TSC_APP_ERROR;
  }

  taosArrayDestroy(pDbVgList);
  taosArrayDestroy(pQnodeList);
dengyihao's avatar
dengyihao 已提交
595

D
dapan1121 已提交
596 597 598
  return code;
}

H
Hongze Cheng 已提交
599
void freeVgList(void* list) {
D
dapan1121 已提交
600 601 602 603
  SArray* pList = *(SArray**)list;
  taosArrayDestroy(pList);
}

D
dapan1121 已提交
604 605 606 607
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
  SArray* pDbVgList = NULL;
  SArray* pQnodeList = NULL;
  int32_t code = 0;
dengyihao's avatar
dengyihao 已提交
608

D
dapan1121 已提交
609 610 611 612
  switch (tsQueryPolicy) {
    case QUERY_POLICY_VNODE: {
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
      if (dbNum > 0) {
dengyihao's avatar
dengyihao 已提交
613
        SCatalog*     pCtg = NULL;
D
dapan1121 已提交
614 615 616 617 618 619
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
        code = catalogGetHandle(pInst->clusterId, &pCtg);
        if (code != TSDB_CODE_SUCCESS) {
          goto _return;
        }

dengyihao's avatar
dengyihao 已提交
620
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
D
dapan1121 已提交
621 622
        SArray* pVgList = NULL;
        for (int32_t i = 0; i < dbNum; ++i) {
dengyihao's avatar
dengyihao 已提交
623
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
D
dapan1121 已提交
624 625 626
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
                                   .requestId = pRequest->requestId,
                                   .requestObjRefId = pRequest->self,
dengyihao's avatar
dengyihao 已提交
627 628
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};

D
dapan1121 已提交
629 630 631 632
          code = catalogGetDBVgInfo(pCtg, &conn, dbFName, &pVgList);
          if (code) {
            goto _return;
          }
dengyihao's avatar
dengyihao 已提交
633

D
dapan1121 已提交
634
          taosArrayPush(pDbVgList, &pVgList);
dengyihao's avatar
dengyihao 已提交
635
        }
D
dapan1121 已提交
636
      }
dengyihao's avatar
dengyihao 已提交
637

D
dapan1121 已提交
638 639 640 641 642 643
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
      break;
    }
    case QUERY_POLICY_HYBRID:
    case QUERY_POLICY_QNODE: {
      getQnodeList(pRequest, &pQnodeList);
dengyihao's avatar
dengyihao 已提交
644

D
dapan1121 已提交
645 646 647 648 649 650 651 652 653 654
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
      break;
    }
    default:
      tscError("unknown query policy: %d", tsQueryPolicy);
      return TSDB_CODE_TSC_APP_ERROR;
  }

_return:

D
dapan1121 已提交
655
  taosArrayDestroyEx(pDbVgList, freeVgList);
D
dapan1121 已提交
656
  taosArrayDestroy(pQnodeList);
dengyihao's avatar
dengyihao 已提交
657

D
dapan1121 已提交
658 659 660
  return code;
}

D
dapan1121 已提交
661
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
662
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
663

X
Xiaoyu Wang 已提交
664
  SExecResult      res = {0};
X
Xiaoyu Wang 已提交
665
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
666
                           .requestId = pRequest->requestId,
X
Xiaoyu Wang 已提交
667
                           .requestObjRefId = pRequest->self};
D
dapan1121 已提交
668
  SSchedulerReq    req = {
X
Xiaoyu Wang 已提交
669 670 671 672 673 674 675 676 677 678 679
         .syncReq = true,
         .pConn = &conn,
         .pNodeList = pNodeList,
         .pDag = pDag,
         .sql = pRequest->sqlstr,
         .startTs = pRequest->metric.start,
         .execFp = NULL,
         .cbParam = NULL,
         .chkKillFp = chkRequestKilled,
         .chkKillParam = (void*)pRequest->self,
         .pExecRes = &res,
D
dapan1121 已提交
680 681 682
  };

  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
D
dapan1121 已提交
683
  memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
D
dapan1121 已提交
684

D
dapan1121 已提交
685
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
686
    schedulerFreeJob(&pRequest->body.queryJob, 0);
687

D
dapan1121 已提交
688
    pRequest->code = code;
D
dapan1121 已提交
689
    terrno = code;
H
Haojun Liao 已提交
690
    return pRequest->code;
X
Xiaoyu Wang 已提交
691
  }
692

693 694
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
      TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
695
    pRequest->body.resInfo.numOfRows = res.numOfRows;
D
dapan1121 已提交
696
    if (TDMT_VND_SUBMIT == pRequest->type) {
X
Xiaoyu Wang 已提交
697 698 699
      STscObj*            pTscObj = pRequest->pTscObj;
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
      atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
D
dapan1121 已提交
700
    }
X
Xiaoyu Wang 已提交
701

D
dapan1121 已提交
702
    schedulerFreeJob(&pRequest->body.queryJob, 0);
D
dapan1121 已提交
703
  }
D
dapan1121 已提交
704

D
dapan1121 已提交
705
  pRequest->code = res.code;
L
Liu Jicong 已提交
706
  terrno = res.code;
D
dapan1121 已提交
707
  return pRequest->code;
X
Xiaoyu Wang 已提交
708
}
X
Xiaoyu Wang 已提交
709

710 711 712
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
  int32_t     code = 0;
  SArray*     pArray = NULL;
D
dapan1121 已提交
713 714 715 716
  SSubmitRsp* pRsp = (SSubmitRsp*)res;
  if (pRsp->nBlocks <= 0) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
717

D
dapan1121 已提交
718 719 720 721 722
  pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
  if (NULL == pArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
723

D
dapan1121 已提交
724 725
  for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
    SSubmitBlkRsp* blk = pRsp->pBlocks + i;
726 727 728 729 730
    if (blk->pMeta) {
      handleCreateTbExecRes(blk->pMeta, pCatalog);
      tFreeSTableMetaRsp(blk->pMeta);
      taosMemoryFreeClear(blk->pMeta);
    }
X
Xiaoyu Wang 已提交
731

D
dapan1121 已提交
732 733
    if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
      continue;
D
dapan1121 已提交
734
    }
dengyihao's avatar
dengyihao 已提交
735

D
dapan1121 已提交
736 737 738
    STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
    taosArrayPush(pArray, &tbSver);
  }
dengyihao's avatar
dengyihao 已提交
739

X
Xiaoyu Wang 已提交
740
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
741 742 743 744 745
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = *epset};

  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
746

D
dapan1121 已提交
747
_return:
748

D
dapan1121 已提交
749
  taosArrayDestroy(pArray);
750
  return code;
D
dapan1121 已提交
751
}
D
dapan1121 已提交
752

753
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
D
dapan1121 已提交
754 755 756 757 758 759 760
  int32_t code = 0;
  SArray* pArray = NULL;
  SArray* pTbArray = (SArray*)res;
  int32_t tbNum = taosArrayGetSize(pTbArray);
  if (tbNum <= 0) {
    return TSDB_CODE_SUCCESS;
  }
D
dapan1121 已提交
761

D
dapan1121 已提交
762 763 764 765 766
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
  if (NULL == pArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
767

D
dapan1121 已提交
768 769 770 771
  for (int32_t i = 0; i < tbNum; ++i) {
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
    taosArrayPush(pArray, &tbSver);
D
dapan1121 已提交
772 773
  }

X
Xiaoyu Wang 已提交
774
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
775 776 777 778 779
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = *epset};

  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
D
dapan1121 已提交
780 781 782 783 784

_return:

  taosArrayDestroy(pArray);
  return code;
D
dapan1121 已提交
785
}
D
dapan1121 已提交
786

D
dapan1121 已提交
787 788
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
D
dapan1121 已提交
789 790
}

791 792 793 794
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
}

H
Haojun Liao 已提交
795
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
D
dapan1121 已提交
796 797 798
  if (NULL == pRequest->body.resInfo.execRes.res) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
799

800 801
  SCatalog*     pCatalog = NULL;
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
H
Haojun Liao 已提交
802 803

  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
804 805
  if (code) {
    return code;
D
dapan1121 已提交
806
  }
807

X
Xiaoyu Wang 已提交
808
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
D
dapan1121 已提交
809
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
810

D
dapan1121 已提交
811
  switch (pRes->msgType) {
812
    case TDMT_VND_ALTER_TABLE:
D
dapan1121 已提交
813
    case TDMT_MND_ALTER_STB: {
D
dapan1121 已提交
814
      code = handleAlterTbExecRes(pRes->res, pCatalog);
D
dapan1121 已提交
815 816
      break;
    }
817 818 819 820 821 822 823 824 825 826 827 828 829
    case TDMT_VND_CREATE_TABLE: {
      SArray* pList = (SArray*)pRes->res;
      int32_t num = taosArrayGetSize(pList);
      for (int32_t i = 0; i < num; ++i) {
        void* res = taosArrayGetP(pList, i);
        code = handleCreateTbExecRes(res, pCatalog);
      }
      break;
    }
    case TDMT_MND_CREATE_STB: {
      code = handleCreateTbExecRes(pRes->res, pCatalog);
      break;
    }
D
dapan1121 已提交
830
    case TDMT_VND_SUBMIT: {
X
Xiaoyu Wang 已提交
831 832
      atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);

D
dapan1121 已提交
833
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
D
dapan1121 已提交
834
      break;
835
    }
dengyihao's avatar
dengyihao 已提交
836
    case TDMT_SCH_QUERY:
D
dapan1121 已提交
837
    case TDMT_SCH_MERGE_QUERY: {
D
dapan1121 已提交
838
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
D
dapan1121 已提交
839 840 841
      break;
    }
    default:
842 843
      tscError("0x%" PRIx64 ", invalid exec result for request type %d, reqId:0x%" PRIx64, pRequest->self,
               pRequest->type, pRequest->requestId);
H
Haojun Liao 已提交
844
      code = TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
845
  }
D
dapan1121 已提交
846 847

  return code;
D
dapan1121 已提交
848
}
D
dapan1121 已提交
849

D
dapan1121 已提交
850
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
851
  SRequestObj* pRequest = (SRequestObj*)param;
852
  pRequest->code = code;
D
dapan1121 已提交
853

X
Xiaoyu Wang 已提交
854 855
  pRequest->metric.resultReady = taosGetTimestampUs();

D
dapan1121 已提交
856 857 858
  if (pResult) {
    memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
  }
859

D
dapan1121 已提交
860 861
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
      TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
862 863
    if (pResult) {
      pRequest->body.resInfo.numOfRows = pResult->numOfRows;
D
dapan1121 已提交
864
      if (TDMT_VND_SUBMIT == pRequest->type) {
X
Xiaoyu Wang 已提交
865 866 867
        STscObj*            pTscObj = pRequest->pTscObj;
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
        atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
D
dapan1121 已提交
868
      }
D
dapan1121 已提交
869
    }
D
dapan1121 已提交
870

D
dapan1121 已提交
871
    schedulerFreeJob(&pRequest->body.queryJob, 0);
D
dapan1121 已提交
872 873

    pRequest->metric.execEnd = taosGetTimestampUs();
D
dapan1121 已提交
874 875
  }

D
dapan1121 已提交
876 877
  taosMemoryFree(pResult);

dengyihao's avatar
dengyihao 已提交
878 879
  tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code,
           tstrerror(code), pRequest->requestId);
D
dapan1121 已提交
880

881
  STscObj* pTscObj = pRequest->pTscObj;
882
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
883 884
    tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64,
             pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
885
    pRequest->prevCode = code;
D
dapan1121 已提交
886
    schedulerFreeJob(&pRequest->body.queryJob, 0);
887 888 889 890
    doAsyncQuery(pRequest, true);
    return;
  }

D
dapan1121 已提交
891
  tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
892
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
D
dapan1121 已提交
893
    removeMeta(pTscObj, pRequest->targetTableList);
894
  }
895

896 897
  handleQueryExecRsp(pRequest);

898 899 900 901
  // return to client
  pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}

H
Haojun Liao 已提交
902 903 904
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
  int32_t code = 0;

D
dapan1121 已提交
905 906 907
  if (pQuery->pRoot) {
    pRequest->stmtType = pQuery->pRoot->type;
  }
X
Xiaoyu Wang 已提交
908

D
dapan1121 已提交
909
  if (pQuery->pRoot && !pRequest->inRetry) {
X
Xiaoyu Wang 已提交
910 911
    STscObj*            pTscObj = pRequest->pTscObj;
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
D
dapan1121 已提交
912
    if (QUERY_NODE_VNODE_MODIF_STMT == pQuery->pRoot->type) {
X
Xiaoyu Wang 已提交
913
      atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
D
dapan1121 已提交
914
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
X
Xiaoyu Wang 已提交
915
      atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
D
dapan1121 已提交
916 917 918
    }
  }

H
Haojun Liao 已提交
919 920
  switch (pQuery->execMode) {
    case QUERY_EXEC_MODE_LOCAL:
D
dapan1121 已提交
921 922 923
      if (!pRequest->validateOnly) {
        code = execLocalCmd(pRequest, pQuery);
      }
H
Haojun Liao 已提交
924 925
      break;
    case QUERY_EXEC_MODE_RPC:
D
dapan1121 已提交
926 927 928
      if (!pRequest->validateOnly) {
        code = execDdlQuery(pRequest, pQuery);
      }
H
Haojun Liao 已提交
929 930
      break;
    case QUERY_EXEC_MODE_SCHEDULE: {
dengyihao's avatar
dengyihao 已提交
931
      SArray*     pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
D
dapan1121 已提交
932 933
      SQueryPlan* pDag = NULL;
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
dengyihao's avatar
dengyihao 已提交
934
      if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
935 936 937 938
        pRequest->body.subplanNum = pDag->numOfSubplans;
        if (!pRequest->validateOnly) {
          SArray* pNodeList = NULL;
          buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
dengyihao's avatar
dengyihao 已提交
939

D
dapan1121 已提交
940 941 942
          code = scheduleQuery(pRequest, pDag, pNodeList);
          taosArrayDestroy(pNodeList);
        }
X
Xiaoyu Wang 已提交
943
      }
D
dapan1121 已提交
944
      taosArrayDestroy(pMnodeList);
H
Haojun Liao 已提交
945
      break;
X
Xiaoyu Wang 已提交
946
    }
H
Haojun Liao 已提交
947 948 949 950 951
    case QUERY_EXEC_MODE_EMPTY_RESULT:
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      break;
    default:
      break;
X
Xiaoyu Wang 已提交
952 953
  }

D
stmt  
dapan1121 已提交
954 955 956
  if (!keepQuery) {
    qDestroyQuery(pQuery);
  }
dengyihao's avatar
dengyihao 已提交
957

958 959 960 961
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
    removeMeta(pRequest->pTscObj, pRequest->targetTableList);
  }

H
Haojun Liao 已提交
962
  handleQueryExecRsp(pRequest);
D
dapan1121 已提交
963

D
dapan1121 已提交
964
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
965
    pRequest->code = terrno;
D
dapan1121 已提交
966 967 968
  }

  if (res) {
D
dapan1121 已提交
969 970
    *res = pRequest->body.resInfo.execRes.res;
    pRequest->body.resInfo.execRes.res = NULL;
X
Xiaoyu Wang 已提交
971
  }
972

973 974 975
  return pRequest;
}

D
dapan1121 已提交
976
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly, bool inRetry) {
D
stmt  
dapan1121 已提交
977 978 979
  SRequestObj* pRequest = NULL;
  SQuery*      pQuery = NULL;

980
  int32_t code = buildRequest(connId, sql, sqlLen, NULL, validateOnly, &pRequest);
981 982 983 984 985 986 987
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  code = parseSql(pRequest, false, &pQuery, NULL);
  if (code != TSDB_CODE_SUCCESS) {
988 989
    pRequest->code = code;
    return pRequest;
D
stmt  
dapan1121 已提交
990 991
  }

D
dapan1121 已提交
992
  pRequest->inRetry = inRetry;
993 994
  pRequest->stableQuery = pQuery->stableQuery;

995 996 997
  return launchQueryImpl(pRequest, pQuery, false, NULL);
}

dengyihao's avatar
dengyihao 已提交
998
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta) {
999
  int32_t code = 0;
1000

D
dapan1121 已提交
1001
  pRequest->body.execMode = pQuery->execMode;
1002

1003 1004
  switch (pQuery->execMode) {
    case QUERY_EXEC_MODE_LOCAL:
1005 1006
      asyncExecLocalCmd(pRequest, pQuery);
      return;
1007 1008 1009 1010
    case QUERY_EXEC_MODE_RPC:
      code = asyncExecDdlQuery(pRequest, pQuery);
      break;
    case QUERY_EXEC_MODE_SCHEDULE: {
D
dapan1121 已提交
1011
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1012 1013 1014 1015 1016 1017 1018 1019 1020

      pRequest->type = pQuery->msgType;

      SPlanContext cxt = {.queryId = pRequest->requestId,
                          .acctId = pRequest->pTscObj->acctId,
                          .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                          .pAstRoot = pQuery->pRoot,
                          .showRewrite = pQuery->showRewrite,
                          .pMsg = pRequest->msgBuf,
X
Xiaoyu Wang 已提交
1021
                          .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1022 1023
                          .pUser = pRequest->pTscObj->user,
                          .sysInfo = pRequest->pTscObj->sysInfo};
1024 1025

      SAppInstInfo* pAppInfo = getAppInfo(pRequest);
dengyihao's avatar
dengyihao 已提交
1026
      SQueryPlan*   pDag = NULL;
D
dapan1121 已提交
1027
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
D
dapan1121 已提交
1028 1029 1030
      if (code) {
        tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
                 pRequest->requestId);
D
dapan1121 已提交
1031 1032
      } else {
        pRequest->body.subplanNum = pDag->numOfSubplans;
1033 1034
      }

1035 1036
      pRequest->metric.planEnd = taosGetTimestampUs();

D
dapan1121 已提交
1037
      if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
D
dapan1121 已提交
1038 1039
        SArray* pNodeList = NULL;
        buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
dengyihao's avatar
dengyihao 已提交
1040

X
Xiaoyu Wang 已提交
1041 1042
        SRequestConnInfo conn = {
            .pTrans = pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self};
D
dapan1121 已提交
1043
        SSchedulerReq req = {
X
Xiaoyu Wang 已提交
1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054
            .syncReq = false,
            .pConn = &conn,
            .pNodeList = pNodeList,
            .pDag = pDag,
            .sql = pRequest->sqlstr,
            .startTs = pRequest->metric.start,
            .execFp = schedulerExecCb,
            .cbParam = pRequest,
            .chkKillFp = chkRequestKilled,
            .chkKillParam = (void*)pRequest->self,
            .pExecRes = NULL,
D
dapan1121 已提交
1055 1056
        };
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
D
dapan1121 已提交
1057
        taosArrayDestroy(pNodeList);
D
dapan1121 已提交
1058
      } else {
D
dapan1121 已提交
1059
        tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1060
                 pRequest->requestId);
D
dapan1121 已提交
1061
        pRequest->body.queryFp(pRequest->body.param, pRequest, code);
1062 1063
      }

1064
      // todo not to be released here
D
dapan1121 已提交
1065
      taosArrayDestroy(pMnodeList);
1066 1067 1068
      break;
    }
    case QUERY_EXEC_MODE_EMPTY_RESULT:
1069
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
D
fix bug  
dapan1121 已提交
1070
      pRequest->body.queryFp(pRequest->body.param, pRequest, 0);
1071 1072
      break;
    default:
X
Xiaoyu Wang 已提交
1073
      pRequest->body.queryFp(pRequest->body.param, pRequest, -1);
1074 1075 1076 1077 1078 1079
      break;
  }

  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
    pRequest->code = terrno;
  }
D
stmt  
dapan1121 已提交
1080 1081
}

D
dapan1121 已提交
1082
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
1083 1084 1085 1086
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
1087 1088 1089 1090

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
1091

D
dapan1121 已提交
1092 1093 1094 1095 1096
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
1097
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
D
dapan1121 已提交
1098 1099 1100
                           .requestId = pRequest->requestId,
                           .requestObjRefId = pRequest->self,
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
D
dapan1121 已提交
1101 1102

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
1103 1104
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
1105
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
D
dapan1121 已提交
1106 1107
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
1108 1109 1110
    }
  }

D
dapan1121 已提交
1111
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
1112
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
1113

D
dapan1121 已提交
1114
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
D
dapan1121 已提交
1115 1116 1117
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
1118 1119
  }

D
dapan1121 已提交
1120
  return code;
D
dapan1121 已提交
1121 1122
}

D
dapan1121 已提交
1123 1124
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
  SCatalog* pCatalog = NULL;
1125 1126
  int32_t   tbNum = taosArrayGetSize(tbList);
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
1127 1128 1129
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
1130

D
dapan1121 已提交
1131 1132 1133 1134 1135 1136 1137 1138
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pTbName = taosArrayGet(tbList, i);
    catalogRemoveTableMeta(pCatalog, pTbName);
  }

  return TSDB_CODE_SUCCESS;
}

1139 1140
//  todo remove it soon
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
D
dapan1121 已提交
1141
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
1142 1143
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
1144
  bool         inRetry = false;
D
dapan1121 已提交
1145

1146 1147
  do {
    destroyRequest(pRequest);
D
dapan1121 已提交
1148
    pRequest = launchQuery(connId, sql, sqlLen, validateOnly, inRetry);
1149
    if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
1150 1151 1152
      break;
    }

1153
    code = refreshMeta(pRequest->pTscObj, pRequest);
D
dapan1121 已提交
1154 1155
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
1156 1157
      break;
    }
D
dapan1121 已提交
1158 1159

    inRetry = true;
1160
  } while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
L
Liu Jicong 已提交
1161

D
dapan1121 已提交
1162 1163 1164
  return pRequest;
}

S
Shengliang Guan 已提交
1165 1166
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
1167

H
Haojun Liao 已提交
1168
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
1169
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
1170
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
1171
  mgmtEpSet->inUse = 0;
1172

S
Shengliang Guan 已提交
1173 1174 1175 1176
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
1177
    }
S
Shengliang Guan 已提交
1178

1179 1180 1181 1182 1183 1184
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

S
Shengliang Guan 已提交
1185 1186 1187 1188 1189 1190 1191
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
1192
    }
S
Shengliang Guan 已提交
1193 1194 1195

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
1196 1197 1198 1199 1200 1201 1202 1203 1204 1205
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
1206
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
1207
                         SAppInstInfo* pAppInfo, int connType) {
1208
  STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
1209 1210 1211 1212 1213
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

1214
  SRequestObj* pRequest = createRequest(pTscObj->id, TDMT_MND_CONNECT);
1215 1216
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
1217 1218 1219
    return NULL;
  }

1220
  SMsgSendInfo* body = buildConnectMsg(pRequest);
1221 1222

  int64_t transporterId = 0;
H
Haojun Liao 已提交
1223
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
1224 1225 1226

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
1227 1228
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
1229
    fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
1230

1231
    terrno = pRequest->code;
1232
    destroyRequest(pRequest);
1233
    taos_close_internal(pTscObj);
1234 1235
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
1236
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
1237
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
1238 1239 1240 1241 1242 1243
    destroyRequest(pRequest);
  }

  return pTscObj;
}

1244
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
1245
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
1246 1247 1248 1249 1250
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
1251
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
1252

1253
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
1254
  pMsgSendInfo->requestId = pRequest->requestId;
H
Haojun Liao 已提交
1255
  pMsgSendInfo->fp = getMsgRspHandle(pMsgSendInfo->msgType);
dengyihao's avatar
dengyihao 已提交
1256
  pMsgSendInfo->param = pRequest;
1257

S
Shengliang Guan 已提交
1258 1259
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
1260

H
Haojun Liao 已提交
1261
  char* db = getDbOfConnection(pObj);
1262
  if (db != NULL) {
S
Shengliang Guan 已提交
1263
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1264
  }
wafwerar's avatar
wafwerar 已提交
1265
  taosMemoryFreeClear(db);
1266

X
Xiaoyu Wang 已提交
1267
  connectReq.connType = pObj->connType;
1268 1269
  connectReq.pid = appInfo.pid;
  connectReq.startTime = appInfo.startTime;
1270

S
Shengliang Guan 已提交
1271
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
dengyihao's avatar
dengyihao 已提交
1272 1273
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
S
Shengliang Guan 已提交
1274 1275

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
1276
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
1277
  tSerializeSConnectReq(pReq, contLen, &connectReq);
1278

S
Shengliang Guan 已提交
1279 1280
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
1281
  return pMsgSendInfo;
1282 1283
}

D
dapan1121 已提交
1284 1285 1286 1287
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
  if (NULL == pEpSet) {
    return;
  }
L
Liu Jicong 已提交
1288

D
dapan1121 已提交
1289 1290 1291
  switch (pSendInfo->target.type) {
    case TARGET_TYPE_MNODE:
      if (NULL == pTscObj) {
X
Xiaoyu Wang 已提交
1292 1293
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
D
dapan1121 已提交
1294 1295 1296
        return;
      }

D
dapan1121 已提交
1297
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
X
Xiaoyu Wang 已提交
1298 1299 1300 1301
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
L
Liu Jicong 已提交
1302
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
D
dapan1121 已提交
1303 1304 1305
      break;
    case TARGET_TYPE_VNODE: {
      if (NULL == pTscObj) {
X
Xiaoyu Wang 已提交
1306 1307
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
D
dapan1121 已提交
1308 1309 1310 1311
        return;
      }

      SCatalog* pCatalog = NULL;
L
Liu Jicong 已提交
1312
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
1313
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
1314 1315
        tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
                 tstrerror(code));
D
dapan1121 已提交
1316 1317
        return;
      }
L
Liu Jicong 已提交
1318

D
dapan1121 已提交
1319
      catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
D
dapan1121 已提交
1320
      taosMemoryFreeClear(pSendInfo->target.dbFName);
D
dapan1121 已提交
1321 1322 1323 1324 1325 1326 1327 1328
      break;
    }
    default:
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
      break;
  }
}

dengyihao's avatar
dengyihao 已提交
1329
int32_t doProcessMsgFromServer(void* param) {
dengyihao's avatar
dengyihao 已提交
1330
  AsyncArg* arg = (AsyncArg*)param;
dengyihao's avatar
dengyihao 已提交
1331 1332 1333
  SRpcMsg*  pMsg = &arg->msg;
  SEpSet*   pEpSet = arg->pEpset;

S
Shengliang Guan 已提交
1334 1335
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
D
dapan1121 已提交
1336
  STscObj* pTscObj = NULL;
1337

dengyihao's avatar
dengyihao 已提交
1338 1339 1340 1341
  STraceId* trace = &pMsg->info.traceId;
  char      tbuf[40] = {0};
  TRACE_TO_STR(trace, tbuf);

D
dapan1121 已提交
1342 1343
  tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s, gtid: %s", pMsg->info.handle,
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
D
dapan1121 已提交
1344

1345
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
1346
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
D
dapan1121 已提交
1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357
    if (pRequest) {
      assert(pRequest->self == pSendInfo->requestObjRefId);

      pRequest->metric.rsp = taosGetTimestampUs();
      pTscObj = pRequest->pTscObj;
      /*
       * There is not response callback function for submit response.
       * The actual inserted number of points is the first number.
       */
      int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
      if (pMsg->code == TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
1358
        tscDebug("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
D
dapan1121 已提交
1359 1360
                 TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
      } else {
D
dapan1121 已提交
1361
        tscError("0x%" PRIx64 " rsp msg:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
D
dapan1121 已提交
1362 1363
                 TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
      }
1364

D
dapan1121 已提交
1365 1366
      taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
    }
1367 1368
  }

D
dapan1121 已提交
1369 1370
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);

dengyihao's avatar
dengyihao 已提交
1371 1372
  SDataBuf buf = {
      .msgType = pMsg->msgType, .len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle, .pEpSet = pEpSet};
1373 1374

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
1375
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1376 1377 1378 1379 1380 1381
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
1382 1383
  }

1384
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1385
  rpcFreeCont(pMsg->pCont);
1386
  destroySendMsgInfo(pSendInfo);
dengyihao's avatar
dengyihao 已提交
1387
  taosMemoryFree(arg);
dengyihao's avatar
dengyihao 已提交
1388
  return TSDB_CODE_SUCCESS;
dengyihao's avatar
dengyihao 已提交
1389 1390 1391
}

void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
1392 1393 1394 1395
  SEpSet* tEpSet = NULL;
  if (pEpSet != NULL) {
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
    memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
dengyihao's avatar
dengyihao 已提交
1396 1397
  }

dengyihao's avatar
dengyihao 已提交
1398
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
dengyihao's avatar
dengyihao 已提交
1399 1400 1401
  arg->msg = *pMsg;
  arg->pEpset = tEpSet;

dengyihao's avatar
dengyihao 已提交
1402
  taosAsyncExec(doProcessMsgFromServer, arg, NULL);
1403
}
1404

dengyihao's avatar
dengyihao 已提交
1405
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
1406 1407 1408 1409 1410 1411 1412 1413 1414 1415
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

1416 1417
  STscObj* pObj = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
  if (pObj) {
dengyihao's avatar
dengyihao 已提交
1418
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
D
dapan1121 已提交
1419 1420
    *rid = pObj->id;
    return (TAOS*)rid;
1421
  }
dengyihao's avatar
dengyihao 已提交
1422

D
dapan1121 已提交
1423
  return NULL;
1424 1425
}

dengyihao's avatar
dengyihao 已提交
1426 1427 1428
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
1429
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
1430 1431
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
1432

dengyihao's avatar
dengyihao 已提交
1433
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
1434 1435
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
1436
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1437
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
1438 1439
}

L
Liu Jicong 已提交
1440
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
1441 1442 1443 1444 1445 1446 1447
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
1448
      if (!IS_VAR_NULL_TYPE(type, bytes) && pCol->offset[pResultInfo->current] != -1) {
1449 1450 1451 1452 1453 1454
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
1455
        pResultInfo->length[i] = 0;
1456 1457 1458 1459
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
1460
        pResultInfo->length[i] = bytes;
1461 1462
      } else {
        pResultInfo->row[i] = NULL;
1463
        pResultInfo->length[i] = 0;
1464 1465 1466 1467 1468
      }
    }
  }
}

1469
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
D
dapan1121 已提交
1470 1471 1472 1473 1474 1475 1476 1477 1478 1479 1480
  assert(pRequest != NULL);

  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
    // All data has returned to App already, no need to try again
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
X
Xiaoyu Wang 已提交
1481 1482 1483
    SSchedulerReq   req = {
          .syncReq = true,
          .pFetchRes = (void**)&pResInfo->pData,
D
dapan1121 已提交
1484
    };
D
dapan1121 已提交
1485
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
D
dapan1121 已提交
1486 1487 1488 1489 1490
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

L
Liu Jicong 已提交
1491 1492
    pRequest->code =
        setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
D
dapan1121 已提交
1493 1494 1495 1496 1497 1498 1499 1500
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

X
Xiaoyu Wang 已提交
1501 1502 1503
    STscObj*            pTscObj = pRequest->pTscObj;
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
    atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
D
dapan1121 已提交
1504

D
dapan1121 已提交
1505 1506 1507 1508 1509 1510 1511 1512 1513 1514 1515 1516 1517
    if (pResultInfo->numOfRows == 0) {
      return NULL;
    }
  }

  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
  }

  return pResultInfo->row;
}

H
Haojun Liao 已提交
1518 1519 1520
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
  SSyncQueryParam* pParam = param;
  tsem_post(&pParam->sem);
1521
}
D
dapan1121 已提交
1522

1523
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1524
  assert(pRequest != NULL);
H
Haojun Liao 已提交
1525

H
Haojun Liao 已提交
1526
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
1527
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
1528
    // All data has returned to App already, no need to try again
1529 1530 1531 1532 1533
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

1534 1535 1536
    // convert ucs4 to native multi-bytes string
    pResultInfo->convertUcs4 = convertUcs4;

1537
    SSyncQueryParam* pParam = pRequest->body.param;
1538
    taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
1539
    tsem_wait(&pParam->sem);
H
Haojun Liao 已提交
1540 1541
  }

dengyihao's avatar
dengyihao 已提交
1542
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1543 1544 1545 1546 1547 1548
    return NULL;
  } else {
    if (setupOneRowPtr) {
      doSetOneRowPtr(pResultInfo);
      pResultInfo->current += 1;
    }
H
Haojun Liao 已提交
1549

1550 1551
    return pResultInfo->row;
  }
H
Haojun Liao 已提交
1552 1553
}

1554
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
1555
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
1556 1557
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
1558 1559
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1560

1561 1562 1563
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1564
  }
1565 1566

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1567 1568
}

1569 1570 1571 1572 1573
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

1574
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1575 1576 1577 1578 1579 1580 1581 1582 1583 1584 1585 1586 1587 1588
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);
D
stmt  
dapan1121 已提交
1589
          ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
dengyihao's avatar
dengyihao 已提交
1590

1591 1592 1593 1594 1595 1596 1597 1598
          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
wmmhello's avatar
wmmhello 已提交
1599 1600
    }
  }
1601

wmmhello's avatar
wmmhello 已提交
1602 1603
  return TSDB_CODE_SUCCESS;
}
1604

1605
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1606
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
1607 1608
  ASSERT(numOfCols == cols);

1609 1610
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
1611 1612
}

dengyihao's avatar
dengyihao 已提交
1613
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
wmmhello's avatar
wmmhello 已提交
1614 1615
  char* p = (char*)pResultInfo->pData;

1616 1617
  // version + length + numOfRows + numOfCol + groupId + flag_segment + column_info
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
wmmhello's avatar
wmmhello 已提交
1618 1619 1620 1621 1622 1623 1624 1625 1626
  int32_t* colLength = (int32_t*)(p + len);
  len += sizeof(int32_t) * numOfCols;

  char* pStart = p + len;
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t colLen = htonl(colLength[i]);

    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
      int32_t* offset = (int32_t*)pStart;
dengyihao's avatar
dengyihao 已提交
1627
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
wmmhello's avatar
wmmhello 已提交
1628 1629
      len += lenTmp;
      pStart += lenTmp;
1630 1631

      for (int32_t j = 0; j < numOfRows; ++j) {
wmmhello's avatar
wmmhello 已提交
1632 1633 1634 1635 1636 1637 1638 1639 1640
        if (offset[j] == -1) {
          continue;
        }
        char* data = offset[j] + pStart;

        int32_t jsonInnerType = *data;
        char*   jsonInnerData = data + CHAR_BYTES;
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
          len += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
wmmhello's avatar
wmmhello 已提交
1641
        } else if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
1642 1643 1644 1645 1646 1647 1648 1649 1650 1651 1652 1653 1654 1655 1656 1657 1658 1659 1660 1661 1662 1663 1664 1665 1666
          len += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
          len += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
          len += (VARSTR_HEADER_SIZE + 32);
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
          len += (VARSTR_HEADER_SIZE + 5);
        } else {
          ASSERT(0);
        }
      }
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      int32_t lenTmp = numOfRows * sizeof(int32_t);
      len += (lenTmp + colLen);
      pStart += lenTmp;
    } else {
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
      len += (lenTmp + colLen);
      pStart += lenTmp;
    }
    pStart += colLen;
  }
  return len;
}

wmmhello's avatar
wmmhello 已提交
1667 1668 1669 1670 1671 1672 1673 1674
static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int32_t numOfRows) {
  bool needConvert = false;
  for (int32_t i = 0; i < numOfCols; ++i) {
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
      needConvert = true;
      break;
    }
  }
H
Haojun Liao 已提交
1675 1676 1677 1678 1679 1680

  if (!needConvert) {
    return TSDB_CODE_SUCCESS;
  }

  tscDebug("start to convert form json format string");
1681

dengyihao's avatar
dengyihao 已提交
1682
  char*   p = (char*)pResultInfo->pData;
wmmhello's avatar
wmmhello 已提交
1683 1684
  int32_t dataLen = estimateJsonLen(pResultInfo, numOfCols, numOfRows);

wmmhello's avatar
wmmhello 已提交
1685
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
dengyihao's avatar
dengyihao 已提交
1686
  if (pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
wmmhello's avatar
wmmhello 已提交
1687 1688
  char* p1 = pResultInfo->convertJson;

wmmhello's avatar
wmmhello 已提交
1689
  int32_t totalLen = 0;
1690
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
wmmhello's avatar
wmmhello 已提交
1691
  memcpy(p1, p, len);
1692

wmmhello's avatar
wmmhello 已提交
1693 1694
  p += len;
  p1 += len;
wmmhello's avatar
wmmhello 已提交
1695
  totalLen += len;
1696

wmmhello's avatar
wmmhello 已提交
1697 1698 1699 1700 1701 1702
  len = sizeof(int32_t) * numOfCols;
  int32_t* colLength = (int32_t*)p;
  int32_t* colLength1 = (int32_t*)p1;
  memcpy(p1, p, len);
  p += len;
  p1 += len;
wmmhello's avatar
wmmhello 已提交
1703
  totalLen += len;
1704

wmmhello's avatar
wmmhello 已提交
1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718
  char* pStart = p;
  char* pStart1 = p1;
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t colLen = htonl(colLength[i]);
    int32_t colLen1 = htonl(colLength1[i]);
    ASSERT(colLen < dataLen);

    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
      int32_t* offset = (int32_t*)pStart;
      int32_t* offset1 = (int32_t*)pStart1;
      len = numOfRows * sizeof(int32_t);
      memcpy(pStart1, pStart, len);
      pStart += len;
      pStart1 += len;
wmmhello's avatar
wmmhello 已提交
1719
      totalLen += len;
wmmhello's avatar
wmmhello 已提交
1720 1721 1722 1723 1724 1725 1726 1727 1728 1729 1730 1731 1732 1733

      len = 0;
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (offset[j] == -1) {
          continue;
        }
        char* data = offset[j] + pStart;

        int32_t jsonInnerType = *data;
        char*   jsonInnerData = data + CHAR_BYTES;
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
          sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
          varDataSetLen(dst, strlen(varDataVal(dst)));
wmmhello's avatar
wmmhello 已提交
1734
        } else if (tTagIsJson(data)) {
wmmhello's avatar
wmmhello 已提交
1735 1736 1737 1738 1739 1740 1741 1742 1743 1744
          char* jsonString = parseTagDatatoJson(data);
          STR_TO_VARSTR(dst, jsonString);
          taosMemoryFree(jsonString);
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
          *(char*)varDataVal(dst) = '\"';
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
                                         varDataVal(dst) + CHAR_BYTES);
          if (length <= 0) {
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset);
            length = 0;
1745
          }
wmmhello's avatar
wmmhello 已提交
1746 1747 1748 1749 1750 1751 1752 1753 1754 1755 1756
          varDataSetLen(dst, length + CHAR_BYTES * 2);
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
          double jsonVd = *(double*)(jsonInnerData);
          sprintf(varDataVal(dst), "%.9lf", jsonVd);
          varDataSetLen(dst, strlen(varDataVal(dst)));
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
          sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
          varDataSetLen(dst, strlen(varDataVal(dst)));
        } else {
          ASSERT(0);
1757
        }
wmmhello's avatar
wmmhello 已提交
1758

dengyihao's avatar
dengyihao 已提交
1759
        offset1[j] = len;
wmmhello's avatar
wmmhello 已提交
1760 1761
        memcpy(pStart1 + len, dst, varDataTLen(dst));
        len += varDataTLen(dst);
1762
      }
wmmhello's avatar
wmmhello 已提交
1763
      colLen1 = len;
wmmhello's avatar
wmmhello 已提交
1764
      totalLen += colLen1;
wmmhello's avatar
wmmhello 已提交
1765 1766 1767 1768 1769 1770
      colLength1[i] = htonl(len);
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      len = numOfRows * sizeof(int32_t);
      memcpy(pStart1, pStart, len);
      pStart += len;
      pStart1 += len;
wmmhello's avatar
wmmhello 已提交
1771 1772
      totalLen += len;
      totalLen += colLen;
wmmhello's avatar
wmmhello 已提交
1773 1774 1775 1776 1777 1778
      memcpy(pStart1, pStart, colLen);
    } else {
      len = BitmapLen(pResultInfo->numOfRows);
      memcpy(pStart1, pStart, len);
      pStart += len;
      pStart1 += len;
wmmhello's avatar
wmmhello 已提交
1779 1780
      totalLen += len;
      totalLen += colLen;
wmmhello's avatar
wmmhello 已提交
1781
      memcpy(pStart1, pStart, colLen);
1782
    }
wmmhello's avatar
wmmhello 已提交
1783 1784
    pStart += colLen;
    pStart1 += colLen1;
1785 1786
  }

1787
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
wmmhello's avatar
wmmhello 已提交
1788
  pResultInfo->pData = pResultInfo->convertJson;
1789 1790 1791
  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1792 1793
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
1794 1795
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
1796
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1797 1798
  }

1799 1800 1801 1802
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
wmmhello's avatar
wmmhello 已提交
1803 1804 1805 1806
  code = doConvertJson(pResultInfo, numOfCols, numOfRows);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1807

L
Liu Jicong 已提交
1808
  char* p = (char*)pResultInfo->pData;
1809

1810 1811 1812 1813
  // version:
  int32_t blockVersion = *(int32_t*)p;
  p += sizeof(int32_t);

L
Liu Jicong 已提交
1814
  int32_t dataLen = *(int32_t*)p;
1815 1816
  p += sizeof(int32_t);

1817 1818 1819 1820 1821 1822 1823 1824 1825 1826 1827
  int32_t rows = *(int32_t*)p;
  p += sizeof(int32_t);

  int32_t cols = *(int32_t*)p;
  p += sizeof(int32_t);

  ASSERT(rows == numOfRows && cols == numOfCols);

  int32_t hasColumnSeg = *(int32_t*)p;
  p += sizeof(int32_t);

L
Liu Jicong 已提交
1828
  uint64_t groupId = *(uint64_t*)p;
1829 1830
  p += sizeof(uint64_t);

1831
  // check fields
1832 1833
  for (int32_t i = 0; i < numOfCols; ++i) {
    int16_t type = *(int16_t*)p;
1834
    p += sizeof(int8_t);
1835

1836
    int32_t bytes = *(int32_t*)p;
1837 1838
    p += sizeof(int32_t);

L
Liu Jicong 已提交
1839
    /*ASSERT(type == pFields[i].type && bytes == pFields[i].bytes);*/
1840 1841
  }

1842 1843 1844 1845
  int32_t* colLength = (int32_t*)p;
  p += sizeof(int32_t) * numOfCols;

  char* pStart = p;
H
Haojun Liao 已提交
1846
  for (int32_t i = 0; i < numOfCols; ++i) {
1847
    colLength[i] = htonl(colLength[i]);
D
dapan1121 已提交
1848 1849 1850 1851
    if (colLength[i] >= dataLen) {
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
      ASSERT(0);
    }
1852 1853 1854 1855 1856 1857 1858 1859 1860 1861

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
1862
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
1863 1864 1865
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
1866
  }
1867

dengyihao's avatar
dengyihao 已提交
1868
  if (convertUcs4) {
1869
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
1870 1871
  }

1872
  return code;
S
Shengliang Guan 已提交
1873 1874
}

H
Haojun Liao 已提交
1875
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
1876
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
1877
  taosThreadMutexLock(&pObj->mutex);
1878 1879 1880 1881
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
1882

wafwerar's avatar
wafwerar 已提交
1883
  taosThreadMutexUnlock(&pObj->mutex);
1884 1885 1886 1887 1888
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
1889
  taosThreadMutexLock(&pTscObj->mutex);
1890
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
1891
  taosThreadMutexUnlock(&pTscObj->mutex);
1892
}
S
Shengliang Guan 已提交
1893

H
Haojun Liao 已提交
1894 1895 1896 1897 1898 1899 1900 1901 1902 1903
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

L
Liu Jicong 已提交
1904 1905
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
                              bool freeAfterUse) {
1906 1907
  assert(pResultInfo != NULL && pRsp != NULL);

L
Liu Jicong 已提交
1908 1909
  if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);

L
Liu Jicong 已提交
1910 1911 1912 1913 1914
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
1915
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
1916
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
1917

1918
  // TODO handle the compressed case
1919
  pResultInfo->totalRows += pResultInfo->numOfRows;
1920 1921
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
1922
}
S
Shengliang Guan 已提交
1923 1924 1925 1926 1927 1928

TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
  void*              clientRpc = NULL;
  SServerStatusRsp   statusRsp = {0};
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
S
Shengliang Guan 已提交
1929
  SRpcMsg            rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
S
Shengliang Guan 已提交
1930 1931 1932 1933 1934 1935 1936 1937 1938 1939 1940 1941 1942 1943 1944 1945 1946 1947
  SRpcMsg            rpcRsp = {0};
  SRpcInit           rpcInit = {0};
  char               pass[TSDB_PASSWORD_LEN + 1] = {0};

  rpcInit.label = "CHK";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "_dnd";

  clientRpc = rpcOpen(&rpcInit);
  if (clientRpc == NULL) {
    tscError("failed to init server status client");
    goto _OVER;
  }

S
Shengliang Guan 已提交
1948 1949 1950 1951 1952 1953 1954 1955
  if (fqdn == NULL) {
    fqdn = tsLocalFqdn;
  }

  if (port == 0) {
    port = tsServerPort;
  }

S
Shengliang Guan 已提交
1956 1957 1958 1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
  epSet.eps[0].port = (uint16_t)port;
  rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
    tscError("failed to send server status req since %s", terrstr());
    goto _OVER;
  }

  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
    tscError("failed to parse server status rsp since %s", terrstr());
    goto _OVER;
  }

  code = statusRsp.statusCode;
1971
  if (details != NULL) {
S
Shengliang Guan 已提交
1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983
    tstrncpy(details, statusRsp.details, maxlen);
  }

_OVER:
  if (clientRpc != NULL) {
    rpcClose(clientRpc);
  }
  if (rpcRsp.pCont != NULL) {
    rpcFreeCont(rpcRsp.pCont);
  }
  return code;
}
D
dapan1121 已提交
1984

1985
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
X
Xiaoyu Wang 已提交
1986
                      int32_t acctId, char* db) {
1987
  SName name = {0};
X
Xiaoyu Wang 已提交
1988

D
dapan1121 已提交
1989 1990 1991 1992
  if (len1 <= 0) {
    return -1;
  }

X
Xiaoyu Wang 已提交
1993 1994 1995 1996
  const char* dbName = db;
  const char* tbName = NULL;
  int32_t     dbLen = 0;
  int32_t     tbLen = 0;
D
dapan1121 已提交
1997 1998 1999 2000 2001 2002 2003 2004 2005 2006
  if (len2 > 0) {
    dbName = str + pos1;
    dbLen = len1;
    tbName = str + pos2;
    tbLen = len2;
  } else {
    dbLen = strlen(db);
    tbName = str + pos1;
    tbLen = len1;
  }
X
Xiaoyu Wang 已提交
2007

D
dapan1121 已提交
2008 2009 2010 2011
  if (dbLen <= 0 || tbLen <= 0) {
    return -1;
  }

D
dapan1121 已提交
2012 2013 2014 2015 2016 2017 2018 2019
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
    return -1;
  }

  if (tNameAddTbName(&name, tbName, tbLen)) {
    return -1;
  }

2020 2021 2022 2023 2024 2025 2026 2027 2028 2029 2030 2031 2032
  char dbFName[TSDB_DB_FNAME_LEN];
  sprintf(dbFName, "%d.%.*s", acctId, dbLen, dbName);

  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
  if (pDb) {
    taosArrayPush(pDb->pTables, &name);
  } else {
    STablesReq db;
    db.pTables = taosArrayInit(20, sizeof(SName));
    strcpy(db.dbFName, dbFName);
    taosArrayPush(db.pTables, &name);
    taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db));
  }
D
dapan1121 已提交
2033 2034 2035 2036 2037

  return TSDB_CODE_SUCCESS;
}

int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
2038 2039
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
  if (NULL == pHash) {
D
dapan1121 已提交
2040 2041 2042
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return terrno;
  }
X
Xiaoyu Wang 已提交
2043 2044

  bool    inEscape = false;
D
dapan1121 已提交
2045
  int32_t code = 0;
X
Xiaoyu Wang 已提交
2046
  void*   pIter = NULL;
X
Xiaoyu Wang 已提交
2047

D
dapan1121 已提交
2048 2049 2050 2051 2052 2053
  int32_t vIdx = 0;
  int32_t vPos[2];
  int32_t vLen[2];

  memset(vPos, -1, sizeof(vPos));
  memset(vLen, 0, sizeof(vLen));
X
Xiaoyu Wang 已提交
2054 2055

  for (int32_t i = 0;; ++i) {
D
dapan1121 已提交
2056 2057 2058 2059 2060
    if (0 == *(tbList + i)) {
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
        vLen[vIdx] = i - vPos[vIdx];
      }

2061
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
D
dapan1121 已提交
2062 2063 2064 2065 2066 2067
      if (code) {
        goto _return;
      }

      break;
    }
X
Xiaoyu Wang 已提交
2068

D
dapan1121 已提交
2069 2070 2071 2072 2073 2074 2075 2076 2077 2078 2079 2080 2081 2082 2083 2084 2085 2086 2087 2088 2089 2090 2091 2092 2093 2094 2095 2096 2097 2098 2099 2100 2101 2102 2103 2104 2105 2106 2107 2108 2109 2110
    if ('`' == *(tbList + i)) {
      inEscape = !inEscape;
      if (!inEscape) {
        if (vPos[vIdx] >= 0) {
          vLen[vIdx] = i - vPos[vIdx];
        } else {
          goto _return;
        }
      }

      continue;
    }

    if (inEscape) {
      if (vPos[vIdx] < 0) {
        vPos[vIdx] = i;
      }
      continue;
    }

    if ('.' == *(tbList + i)) {
      if (vPos[vIdx] < 0) {
        goto _return;
      }
      if (vLen[vIdx] <= 0) {
        vLen[vIdx] = i - vPos[vIdx];
      }
      vIdx++;
      if (vIdx >= 2) {
        goto _return;
      }
      continue;
    }

    if (',' == *(tbList + i)) {
      if (vPos[vIdx] < 0) {
        goto _return;
      }
      if (vLen[vIdx] <= 0) {
        vLen[vIdx] = i - vPos[vIdx];
      }

2111
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
D
dapan1121 已提交
2112 2113 2114
      if (code) {
        goto _return;
      }
X
Xiaoyu Wang 已提交
2115

D
dapan1121 已提交
2116 2117 2118 2119 2120 2121 2122 2123 2124 2125 2126 2127 2128
      memset(vPos, -1, sizeof(vPos));
      memset(vLen, 0, sizeof(vLen));
      vIdx = 0;
      continue;
    }

    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
        vLen[vIdx] = i - vPos[vIdx];
      }
      continue;
    }

X
Xiaoyu Wang 已提交
2129
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
D
dapan1121 已提交
2130
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
D
dapan1121 已提交
2131 2132 2133 2134 2135 2136 2137 2138 2139 2140 2141 2142
      if (vLen[vIdx] > 0) {
        goto _return;
      }
      if (vPos[vIdx] < 0) {
        vPos[vIdx] = i;
      }
      continue;
    }

    goto _return;
  }

2143 2144 2145 2146 2147 2148 2149 2150 2151 2152 2153
  int32_t dbNum = taosHashGetSize(pHash);
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
  pIter = taosHashIterate(pHash, NULL);
  while (pIter) {
    STablesReq* pDb = (STablesReq*)pIter;
    taosArrayPush(*pReq, pDb);
    pIter = taosHashIterate(pHash, pIter);
  }

  taosHashCleanup(pHash);

D
dapan1121 已提交
2154 2155 2156 2157 2158 2159
  return TSDB_CODE_SUCCESS;

_return:

  terrno = TSDB_CODE_TSC_INVALID_OPERATION;

2160 2161 2162 2163 2164 2165 2166 2167
  pIter = taosHashIterate(pHash, NULL);
  while (pIter) {
    STablesReq* pDb = (STablesReq*)pIter;
    taosArrayDestroy(pDb->pTables);
    pIter = taosHashIterate(pHash, pIter);
  }

  taosHashCleanup(pHash);
X
Xiaoyu Wang 已提交
2168

D
dapan1121 已提交
2169 2170 2171 2172
  return terrno;
}

void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
X
Xiaoyu Wang 已提交
2173
  SSyncQueryParam* pParam = param;
D
dapan1121 已提交
2174 2175 2176 2177 2178
  pParam->pRequest->code = code;

  tsem_post(&pParam->sem);
}

X
Xiaoyu Wang 已提交
2179 2180
void syncQueryFn(void* param, void* res, int32_t code) {
  SSyncQueryParam* pParam = param;
D
dapan1121 已提交
2181
  pParam->pRequest = res;
H
Haojun Liao 已提交
2182

D
dapan1121 已提交
2183 2184 2185
  if (pParam->pRequest) {
    pParam->pRequest->code = code;
  }
D
dapan1121 已提交
2186 2187 2188 2189

  tsem_post(&pParam->sem);
}

2190 2191
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly) {
  if (sql == NULL || NULL == fp) {
D
dapan1121 已提交
2192 2193 2194 2195 2196 2197 2198 2199 2200 2201 2202 2203 2204
    terrno = TSDB_CODE_INVALID_PARA;
    fp(param, NULL, terrno);
    return;
  }

  size_t sqlLen = strlen(sql);
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    fp(param, NULL, terrno);
    return;
  }

X
Xiaoyu Wang 已提交
2205
  SRequestObj* pRequest = NULL;
2206
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest);
D
dapan1121 已提交
2207 2208 2209 2210 2211 2212 2213 2214 2215 2216
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    fp(param, NULL, terrno);
    return;
  }

  pRequest->body.queryFp = fp;
  doAsyncQuery(pRequest, false);
}

X
Xiaoyu Wang 已提交
2217
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
D
dapan1121 已提交
2218 2219 2220 2221 2222 2223
  if (NULL == taos) {
    terrno = TSDB_CODE_TSC_DISCONNECTED;
    return NULL;
  }

#if SYNC_ON_TOP_OF_ASYNC
X
Xiaoyu Wang 已提交
2224
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
D
dapan1121 已提交
2225 2226
  tsem_init(&param->sem, 0, 0);

2227
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly);
D
dapan1121 已提交
2228
  tsem_wait(&param->sem);
2229 2230 2231
  if (param->pRequest != NULL) {
    param->pRequest->syncQuery = true;
  }
D
dapan1121 已提交
2232 2233 2234 2235 2236 2237 2238 2239 2240
  return param->pRequest;
#else
  size_t sqlLen = strlen(sql);
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

H
Hongze Cheng 已提交
2241
  TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly);
D
dapan1121 已提交
2242 2243 2244
  return pRes;
#endif
}