clientImpl.c 45.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "command.h"
20
#include "scheduler.h"
H
Haojun Liao 已提交
21
#include "tdatablock.h"
22
#include "tdataformat.h"
23
#include "tdef.h"
24
#include "tglobal.h"
25
#include "tmsgtype.h"
H
Haojun Liao 已提交
26
#include "tpagedbuf.h"
27
#include "tref.h"
X
Xiaoyu Wang 已提交
28

S
Shengliang Guan 已提交
29
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
30
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
dengyihao's avatar
dengyihao 已提交
31
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
32

33
static bool stringLengthCheck(const char* str, size_t maxsize) {
34 35 36 37 38 39 40 41 42 43 44 45
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
46
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
47

dengyihao's avatar
dengyihao 已提交
48
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
49

dengyihao's avatar
dengyihao 已提交
50
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
51

52 53 54 55 56 57
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
58
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
59
                                SAppInstInfo* pAppInfo, int connType);
60

dengyihao's avatar
dengyihao 已提交
61
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
L
Liu Jicong 已提交
62
                            uint16_t port, int connType) {
63 64 65 66
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

67 68 69 70 71
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

72
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
73
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
74
    if (!validateDbName(db)) {
75 76 77 78
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

79 80
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
81 82
  }

83
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
84 85 86 87 88 89
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
90
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
91 92 93 94
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

95
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
96 97 98 99 100 101 102 103 104
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
105

106 107
  if (port) {
    epSet.epSet.eps[0].port = port;
S
Shengliang Guan 已提交
108
    epSet.epSet.eps[1].port = port;
109 110
  }

111
  char* key = getClusterKey(user, secretEncrypt, ip, port);
112

113
  SAppInstInfo** pInst = NULL;
wafwerar's avatar
wafwerar 已提交
114
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
115 116

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
117
  SAppInstInfo* p = NULL;
118
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
119
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
120
    p->mgmtEp = epSet;
D
dapan1121 已提交
121
    taosThreadMutexInit(&p->qnodeMutex, NULL);
H
Haojun Liao 已提交
122
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
123
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
124
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
125

H
Haojun Liao 已提交
126
    pInst = &p;
127 128
  }

wafwerar's avatar
wafwerar 已提交
129
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
130

wafwerar's avatar
wafwerar 已提交
131
  taosMemoryFreeClear(key);
L
Liu Jicong 已提交
132
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType);
133 134
}

dengyihao's avatar
dengyihao 已提交
135
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
136
  *pRequest = createRequest(pTscObj, NULL, TSDB_SQL_SELECT);
X
Xiaoyu Wang 已提交
137
  if (*pRequest == NULL) {
138
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
139
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
140 141
  }

wafwerar's avatar
wafwerar 已提交
142
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
143
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
144
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
145 146
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
147 148
  }

X
Xiaoyu Wang 已提交
149 150 151
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
152

dengyihao's avatar
dengyihao 已提交
153 154
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
                  sizeof((*pRequest)->self))) {
D
dapan1121 已提交
155 156 157 158 159 160
    destroyRequest(*pRequest);
    *pRequest = NULL;
    tscError("put request to request hash failed");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
161
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
162 163
  return TSDB_CODE_SUCCESS;
}
164

D
stmt  
dapan1121 已提交
165
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
166 167
  STscObj* pTscObj = pRequest->pTscObj;

168 169 170 171 172 173 174 175 176 177
  SParseContext cxt = {.requestId = pRequest->requestId,
                       .acctId = pTscObj->acctId,
                       .db = pRequest->pDb,
                       .topicQuery = topicQuery,
                       .pSql = pRequest->sqlstr,
                       .sqlLen = pRequest->sqlLen,
                       .pMsg = pRequest->msgBuf,
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
                       .pStmtCb = pStmtCb,
178 179
                       .pUser = pTscObj->user,
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER))};
H
Haojun Liao 已提交
180

H
Haojun Liao 已提交
181 182
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
183 184 185 186
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

X
Xiaoyu Wang 已提交
187
  code = qParseSql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
188 189 190
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
191
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
X
Xiaoyu Wang 已提交
192
    }
193
  }
194

195
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
wafwerar's avatar
wafwerar 已提交
196 197
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
X
Xiaoyu Wang 已提交
198
  }
199

X
Xiaoyu Wang 已提交
200 201
  return code;
}
H
Haojun Liao 已提交
202

203 204
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
L
Liu Jicong 已提交
205
  int32_t            code = qExecCommand(pQuery->pRoot, &pRsp);
206
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
L
Liu Jicong 已提交
207
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false);
208 209 210 211
  }
  return code;
}

X
Xiaoyu Wang 已提交
212
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
213 214 215 216 217
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

218 219 220
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
221
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
222 223 224

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
225

226 227
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
228

229
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
230 231
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
232

233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) {
  return pRequest->pTscObj->pAppInfo;
}

int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management

  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);

  int64_t transporterId = 0;
  asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
256
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
L
Liu Jicong 已提交
257 258
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
D
dapan1121 已提交
259 260 261 262

  if (node1->load < node2->load) {
    return -1;
  }
L
Liu Jicong 已提交
263

D
dapan1121 已提交
264 265 266
  return node1->load > node2->load;
}

L
Liu Jicong 已提交
267
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
D
dapan1121 已提交
268 269 270 271 272
  taosThreadMutexLock(&pInfo->qnodeMutex);
  if (pInfo->pQnodeList) {
    taosArrayDestroy(pInfo->pQnodeList);
    pInfo->pQnodeList = NULL;
  }
L
Liu Jicong 已提交
273

D
dapan1121 已提交
274 275 276 277 278 279 280 281 282 283
  if (pNodeList) {
    pInfo->pQnodeList = taosArrayDup(pNodeList);
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
  }
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  return TSDB_CODE_SUCCESS;
}

int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
L
Liu Jicong 已提交
284 285 286
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
  int32_t       code = 0;

D
dapan1121 已提交
287 288 289 290 291 292 293
  taosThreadMutexLock(&pInfo->qnodeMutex);
  if (pInfo->pQnodeList) {
    *pNodeList = taosArrayDup(pInfo->pQnodeList);
  }
  taosThreadMutexUnlock(&pInfo->qnodeMutex);

  if (NULL == *pNodeList) {
L
Liu Jicong 已提交
294 295
    SEpSet    mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
    SCatalog* pCatalog = NULL;
D
dapan1121 已提交
296 297 298 299 300
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
    if (TSDB_CODE_SUCCESS == code) {
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
      code = catalogGetQnodeList(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, &mgmtEpSet, *pNodeList);
    }
L
Liu Jicong 已提交
301

D
dapan1121 已提交
302 303 304 305 306 307 308 309 310
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
      code = updateQnodeList(pInfo, *pNodeList);
    }
  }

  return code;
}

int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray** pNodeList) {
311
  pRequest->type = pQuery->msgType;
312 313
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);

L
Liu Jicong 已提交
314 315
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
316
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
L
Liu Jicong 已提交
317 318 319
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite,
                      .pMsg = pRequest->msgBuf,
320
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};
321

D
dapan1121 已提交
322
  int32_t code = getQnodeList(pRequest, pNodeList);
323
  if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
324
    code = qCreateQueryPlan(&cxt, pPlan, *pNodeList);
X
Xiaoyu Wang 已提交
325
  }
326

X
Xiaoyu Wang 已提交
327
  return code;
X
Xiaoyu Wang 已提交
328 329
}

330
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
331
  ASSERT(pSchema != NULL && numOfCols > 0);
332

333
  pResInfo->numOfCols = numOfCols;
L
Liu Jicong 已提交
334 335 336 337 338 339
  if (pResInfo->fields != NULL) {
    taosMemoryFree(pResInfo->fields);
  }
  if (pResInfo->userFields != NULL) {
    taosMemoryFree(pResInfo->userFields);
  }
340 341
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
342 343

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
344
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
345
    pResInfo->fields[i].type = pSchema[i].type;
346 347

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
348
    pResInfo->userFields[i].type = pSchema[i].type;
349 350 351

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
wmmhello's avatar
wmmhello 已提交
352
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
353 354 355
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

356
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
357
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
358
  }
X
Xiaoyu Wang 已提交
359 360
}

361
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
dengyihao's avatar
dengyihao 已提交
362 363
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
      precision != TSDB_TIME_PRECISION_NANO) {
364 365 366 367 368 369
    return;
  }

  pResInfo->precision = precision;
}

D
dapan1121 已提交
370
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
D
dapan1121 已提交
371
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
L
Liu Jicong 已提交
372

D
dapan1121 已提交
373 374 375 376 377
  tsem_init(&schdRspSem, 0, 0);

  SQueryResult res = {.code = 0, .numOfRows = 0};
  int32_t      code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
                                       pRequest->metric.start, schdExecCallback, &res);
D
dapan1121 已提交
378 379 380

  pRequest->body.resInfo.execRes = res.res;

381
  while (true) {
D
dapan1121 已提交
382 383 384 385 386 387 388 389 390 391
    if (code != TSDB_CODE_SUCCESS) {
      if (pRequest->body.queryJob != 0) {
        schedulerFreeJob(pRequest->body.queryJob);
      }

      pRequest->code = code;
      terrno = code;
      return pRequest->code;
    } else {
      tsem_wait(&schdRspSem);
L
Liu Jicong 已提交
392

D
dapan1121 已提交
393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413
      if (res.code) {
        code = res.code;
      } else {
        break;
      }
    }
  }

  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
    pRequest->body.resInfo.numOfRows = res.numOfRows;

    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
    }
  }

  pRequest->code = res.code;
  terrno = res.code;
  return pRequest->code;
}

D
dapan1121 已提交
414
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
415
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
416

417
  SQueryResult res = {0};
L
Liu Jicong 已提交
418
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
419
                                       pRequest->metric.start, &res);
D
dapan1121 已提交
420

D
dapan1121 已提交
421
  pRequest->body.resInfo.execRes = res.res;
D
dapan1121 已提交
422

D
dapan1121 已提交
423
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
424 425
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
426 427
    }

D
dapan1121 已提交
428
    pRequest->code = code;
D
dapan1121 已提交
429
    terrno = code;
H
Haojun Liao 已提交
430
    return pRequest->code;
X
Xiaoyu Wang 已提交
431
  }
432

433
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
434
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
435

D
dapan1121 已提交
436 437
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
438 439
    }
  }
D
dapan1121 已提交
440

D
dapan1121 已提交
441
  pRequest->code = res.code;
L
Liu Jicong 已提交
442
  terrno = res.code;
D
dapan1121 已提交
443
  return pRequest->code;
X
Xiaoyu Wang 已提交
444
}
X
Xiaoyu Wang 已提交
445

D
dapan1121 已提交
446
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList) {
D
dapan1121 已提交
447
  return getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
D
dapan1121 已提交
448 449
}

D
dapan1121 已提交
450
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) {
D
dapan1121 已提交
451
  int32_t code = 0;
D
dapan1121 已提交
452 453 454 455 456
  SArray* pArray = NULL;
  SSubmitRsp* pRsp = (SSubmitRsp*)res;
  if (pRsp->nBlocks <= 0) {
    return TSDB_CODE_SUCCESS;
  }
dengyihao's avatar
dengyihao 已提交
457

D
dapan1121 已提交
458 459 460 461 462
  pArray = taosArrayInit(pRsp->nBlocks, sizeof(STbSVersion));
  if (NULL == pArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
463

D
dapan1121 已提交
464 465 466 467
  for (int32_t i = 0; i < pRsp->nBlocks; ++i) {
    SSubmitBlkRsp* blk = pRsp->pBlocks + i;
    if (NULL == blk->tblFName || 0 == blk->tblFName[0]) {
      continue;
D
dapan1121 已提交
468
    }
dengyihao's avatar
dengyihao 已提交
469

D
dapan1121 已提交
470 471 472
    STbSVersion tbSver = {.tbFName = blk->tblFName, .sver = blk->sver};
    taosArrayPush(pArray, &tbSver);
  }
473

D
dapan1121 已提交
474
  code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, epset, pArray);
475

D
dapan1121 已提交
476
_return:
D
dapan1121 已提交
477

D
dapan1121 已提交
478
  taosArrayDestroy(pArray);
479
  return code;
D
dapan1121 已提交
480
}
D
dapan1121 已提交
481

D
dapan1121 已提交
482 483 484 485 486 487 488
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet *epset) {
  int32_t code = 0;
  SArray* pArray = NULL;
  SArray* pTbArray = (SArray*)res;
  int32_t tbNum = taosArrayGetSize(pTbArray);
  if (tbNum <= 0) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
489 490
  }

D
dapan1121 已提交
491 492 493 494 495
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
  if (NULL == pArray) {
    terrno = TSDB_CODE_OUT_OF_MEMORY;
    return TSDB_CODE_OUT_OF_MEMORY;
  }
D
dapan1121 已提交
496

D
dapan1121 已提交
497 498 499 500
  for (int32_t i = 0; i < tbNum; ++i) {
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
    taosArrayPush(pArray, &tbSver);
D
dapan1121 已提交
501 502
  }

D
dapan1121 已提交
503
  code = catalogChkTbMetaVersion(pCatalog, pRequest->pTscObj->pAppInfo->pTransporter, epset, pArray);
D
dapan1121 已提交
504 505 506 507 508 509 510

_return:

  taosArrayDestroy(pArray);
  return code;
}

D
dapan1121 已提交
511 512
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
D
dapan1121 已提交
513 514
}

D
dapan1121 已提交
515
int32_t handleExecRes(SRequestObj* pRequest) {
D
dapan1121 已提交
516 517
  if (NULL == pRequest->body.resInfo.execRes.res) {
    return TSDB_CODE_SUCCESS;
D
dapan1121 已提交
518
  }
dengyihao's avatar
dengyihao 已提交
519

D
dapan1121 已提交
520 521 522 523 524
  int32_t code = 0;
  SCatalog* pCatalog = NULL;
  code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code) {
    return code;
D
dapan1121 已提交
525
  }
526

D
dapan1121 已提交
527
  SEpSet epset = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
D
dapan1121 已提交
528
  SQueryExecRes* pRes = &pRequest->body.resInfo.execRes;
529

D
dapan1121 已提交
530
  switch (pRes->msgType) {
531
    case TDMT_VND_ALTER_TABLE:
D
dapan1121 已提交
532
    case TDMT_MND_ALTER_STB: {
D
dapan1121 已提交
533
      code = handleAlterTbExecRes(pRes->res, pCatalog);
D
dapan1121 已提交
534 535 536
      break;
    }
    case TDMT_VND_SUBMIT: {
D
dapan1121 已提交
537
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
D
dapan1121 已提交
538
      break;
539
    }
D
dapan1121 已提交
540
    case TDMT_VND_QUERY: {
D
dapan1121 已提交
541
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
D
dapan1121 已提交
542 543 544 545 546
      break;
    }
    default:
      tscError("invalid exec result for request type %d", pRequest->type);
      return TSDB_CODE_APP_ERROR;
D
dapan1121 已提交
547
  }
D
dapan1121 已提交
548 549

  return code;
D
dapan1121 已提交
550
}
D
dapan1121 已提交
551

552 553 554 555 556 557 558
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
  SRequestObj* pRequest = (SRequestObj*) param;

  // return to client
  pRequest->body.queryFp(pRequest->body.param, pRequest, code);
}

D
dapan 已提交
559
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery, void** res) {
X
Xiaoyu Wang 已提交
560 561 562 563 564 565 566 567
  if (TSDB_CODE_SUCCESS == code) {
    switch (pQuery->execMode) {
      case QUERY_EXEC_MODE_LOCAL:
        code = execLocalCmd(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_RPC:
        code = execDdlQuery(pRequest, pQuery);
        break;
X
Xiaoyu Wang 已提交
568
      case QUERY_EXEC_MODE_SCHEDULE: {
D
dapan1121 已提交
569 570
        SArray* pNodeList = NULL;
        code = getPlan(pRequest, pQuery, &pRequest->body.pDag, &pNodeList);
X
Xiaoyu Wang 已提交
571
        if (TSDB_CODE_SUCCESS == code) {
D
dapan1121 已提交
572
          code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
X
Xiaoyu Wang 已提交
573
        }
X
Xiaoyu Wang 已提交
574
        taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
575
        break;
X
Xiaoyu Wang 已提交
576
      }
X
Xiaoyu Wang 已提交
577 578 579 580 581 582
      case QUERY_EXEC_MODE_EMPTY_RESULT:
        pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
583 584
  }

D
stmt  
dapan1121 已提交
585 586 587
  if (!keepQuery) {
    qDestroyQuery(pQuery);
  }
dengyihao's avatar
dengyihao 已提交
588

D
dapan1121 已提交
589
  handleExecRes(pRequest);
D
dapan1121 已提交
590

D
dapan1121 已提交
591
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
592
    pRequest->code = terrno;
D
dapan1121 已提交
593 594 595
  }

  if (res) {
D
dapan1121 已提交
596 597
    *res = pRequest->body.resInfo.execRes.res;
    pRequest->body.resInfo.execRes.res = NULL;
X
Xiaoyu Wang 已提交
598
  }
599

600 601 602
  return pRequest;
}

D
stmt  
dapan1121 已提交
603 604 605 606 607
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
  SQuery*      pQuery = NULL;

  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
608 609 610 611 612 613 614
  if (code != TSDB_CODE_SUCCESS) {
    terrno = code;
    return NULL;
  }

  code = parseSql(pRequest, false, &pQuery, NULL);
  if (code != TSDB_CODE_SUCCESS) {
615 616
    pRequest->code = code;
    return pRequest;
D
stmt  
dapan1121 已提交
617 618
  }

619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
  return launchQueryImpl(pRequest, pQuery, false, NULL);
}

void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery) {
  void* pRes = NULL;

  int32_t code = 0;
  switch (pQuery->execMode) {
    case QUERY_EXEC_MODE_LOCAL:
      code = execLocalCmd(pRequest, pQuery);
      break;
    case QUERY_EXEC_MODE_RPC:
      code = asyncExecDdlQuery(pRequest, pQuery);
      break;
    case QUERY_EXEC_MODE_SCHEDULE: {
      SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));

      pRequest->type = pQuery->msgType;

      SPlanContext cxt = {.queryId = pRequest->requestId,
                          .acctId = pRequest->pTscObj->acctId,
                          .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                          .pAstRoot = pQuery->pRoot,
                          .showRewrite = pQuery->showRewrite,
                          .pMsg = pRequest->msgBuf,
                          .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE};

      SAppInstInfo* pAppInfo = getAppInfo(pRequest);

      if (TSDB_CODE_SUCCESS == code) {
        code = qCreateQueryPlan(&cxt, &pRequest->body.pDag, pNodeList);
      }

      if (TSDB_CODE_SUCCESS == code) {
        schedulerAsyncExecJob(pAppInfo->pTransporter, pNodeList, pRequest->body.pDag, &pRequest->body.queryJob,
                              pRequest->sqlstr, pRequest->metric.start, schedulerExecCb, pRequest);
        //        if (NULL != pRes) {
        //          code = validateSversion(pRequest, pRes);
        //        }
      }

      taosArrayDestroy(pNodeList);
      break;
    }
    case QUERY_EXEC_MODE_EMPTY_RESULT:
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
      break;
    default:
      break;
  }

  //    if (!keepQuery) {
  //      qDestroyQuery(pQuery);
  //    }

  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
    pRequest->code = terrno;
  }

  //    if (res) {
  //      *res = pRes;
  //    } else {
//  freeRequestRes(pRequest, pRes);
//  pRes = NULL;
D
stmt  
dapan1121 已提交
683 684
}

D
dapan1121 已提交
685
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
686 687 688 689
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
690 691 692 693

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
694

D
dapan1121 已提交
695 696 697 698 699 700 701 702
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
703 704
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
705 706 707
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
708 709 710
    }
  }

D
dapan1121 已提交
711
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
712
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
713 714 715 716 717

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
718 719
  }

D
dapan1121 已提交
720
  return code;
D
dapan1121 已提交
721 722
}

D
dapan1121 已提交
723 724
int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
  SCatalog* pCatalog = NULL;
725 726
  int32_t   tbNum = taosArrayGetSize(tbList);
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
727 728 729
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
730

D
dapan1121 已提交
731 732 733 734 735 736 737 738
  for (int32_t i = 0; i < tbNum; ++i) {
    SName* pTbName = taosArrayGet(tbList, i);
    catalogRemoveTableMeta(pCatalog, pTbName);
  }

  return TSDB_CODE_SUCCESS;
}

D
dapan1121 已提交
739 740
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
741 742
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
743

744 745
  do {
    destroyRequest(pRequest);
D
stmt  
dapan1121 已提交
746
    pRequest = launchQuery(pTscObj, sql, sqlLen);
747
    if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
748 749 750
      break;
    }

D
dapan1121 已提交
751 752 753
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
754 755
      break;
    }
756
  } while (retryNum++ < REQUEST_MAX_TRY_TIMES);
L
Liu Jicong 已提交
757

D
dapan1121 已提交
758 759 760
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
    removeMeta(pTscObj, pRequest->tableList);
  }
761

D
dapan1121 已提交
762 763 764
  return pRequest;
}

S
Shengliang Guan 已提交
765 766
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
767

H
Haojun Liao 已提交
768
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
769
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
770
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
771
  mgmtEpSet->inUse = 0;
772

S
Shengliang Guan 已提交
773 774 775 776
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
777
    }
S
Shengliang Guan 已提交
778

779 780 781 782 783 784
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    if (code != TSDB_CODE_SUCCESS) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return terrno;
    }

S
Shengliang Guan 已提交
785 786 787 788 789 790 791
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
792
    }
S
Shengliang Guan 已提交
793 794 795

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
796 797 798 799 800 801 802 803 804 805
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
806
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
L
Liu Jicong 已提交
807
                         SAppInstInfo* pAppInfo, int connType) {
808
  STscObj* pTscObj = createTscObj(user, auth, db, connType, pAppInfo);
809 810 811 812 813
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

814
  SRequestObj* pRequest = createRequest(pTscObj, param, TDMT_MND_CONNECT);
815 816 817
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
818 819 820
    return NULL;
  }

821
  SMsgSendInfo* body = buildConnectMsg(pRequest);
822 823

  int64_t transporterId = 0;
H
Haojun Liao 已提交
824
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
825 826 827

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
828 829
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
830
    fprintf(stderr, "failed to connect to server, reason: %s\n\n", errorMsg);
831

832
    terrno = pRequest->code;
833 834 835 836
    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
837
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
838
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
839 840 841 842 843 844
    destroyRequest(pRequest);
  }

  return pTscObj;
}

845
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
846
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
847 848 849 850 851
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
852
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
853

854
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
855 856 857
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
858

S
Shengliang Guan 已提交
859 860
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
861

H
Haojun Liao 已提交
862
  char* db = getDbOfConnection(pObj);
863
  if (db != NULL) {
S
Shengliang Guan 已提交
864
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
865
  }
wafwerar's avatar
wafwerar 已提交
866
  taosMemoryFreeClear(db);
867

X
Xiaoyu Wang 已提交
868 869
  connectReq.connType = pObj->connType;
  connectReq.pid = htonl(appInfo.pid);
S
Shengliang Guan 已提交
870
  connectReq.startTime = htobe64(appInfo.startTime);
871

S
Shengliang Guan 已提交
872
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
dengyihao's avatar
dengyihao 已提交
873 874
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
S
Shengliang Guan 已提交
875 876

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
877
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
878
  tSerializeSConnectReq(pReq, contLen, &connectReq);
879

S
Shengliang Guan 已提交
880 881
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
882
  return pMsgSendInfo;
883 884
}

885
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
886
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
887 888
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
889
}
890

D
dapan1121 已提交
891 892 893 894
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
  if (NULL == pEpSet) {
    return;
  }
L
Liu Jicong 已提交
895

D
dapan1121 已提交
896 897 898 899 900 901 902
  switch (pSendInfo->target.type) {
    case TARGET_TYPE_MNODE:
      if (NULL == pTscObj) {
        tscError("mnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
        return;
      }

L
Liu Jicong 已提交
903
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
D
dapan1121 已提交
904 905 906 907 908 909 910 911
      break;
    case TARGET_TYPE_VNODE: {
      if (NULL == pTscObj) {
        tscError("vnode epset changed but not able to update it, reqObjRefId:%" PRIx64, pSendInfo->requestObjRefId);
        return;
      }

      SCatalog* pCatalog = NULL;
L
Liu Jicong 已提交
912
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
D
dapan1121 已提交
913
      if (code != TSDB_CODE_SUCCESS) {
L
Liu Jicong 已提交
914 915
        tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
                 tstrerror(code));
D
dapan1121 已提交
916 917
        return;
      }
L
Liu Jicong 已提交
918

D
dapan1121 已提交
919 920 921 922 923 924 925 926 927
      catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
      break;
    }
    default:
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
      break;
  }
}

928
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
S
Shengliang Guan 已提交
929 930
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
  assert(pMsg->info.ahandle != NULL);
D
dapan1121 已提交
931
  STscObj* pTscObj = NULL;
932

933
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
934
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
935
    assert(pRequest->self == pSendInfo->requestObjRefId);
936

D
dapan1121 已提交
937
    pRequest->metric.rsp = taosGetTimestampUs();
D
dapan1121 已提交
938
    pTscObj = pRequest->pTscObj;
939
    /*
dengyihao's avatar
dengyihao 已提交
940 941
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
942
     */
943
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
944
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
945
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
946
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
947
    } else {
dengyihao's avatar
dengyihao 已提交
948
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
949
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
950
    }
951

H
Haojun Liao 已提交
952
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
953 954
  }

D
dapan1121 已提交
955 956
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);

S
Shengliang Guan 已提交
957
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->info.handle};
958 959

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
960
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
961 962 963 964 965 966
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
967 968
  }

969
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
970
  rpcFreeCont(pMsg->pCont);
971
  destroySendMsgInfo(pSendInfo);
972
}
973

dengyihao's avatar
dengyihao 已提交
974
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
975 976 977 978 979 980 981 982 983 984
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

L
Liu Jicong 已提交
985
  return taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY);
986 987
}

dengyihao's avatar
dengyihao 已提交
988 989 990
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
991
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
992 993
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
994

dengyihao's avatar
dengyihao 已提交
995
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
996 997
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
998
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
999
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
1000 1001
}

L
Liu Jicong 已提交
1002
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
1003 1004 1005 1006 1007 1008 1009
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
1010
      if (pCol->offset[pResultInfo->current] != -1) {
1011 1012 1013 1014 1015 1016
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
1017
        pResultInfo->length[i] = 0;
1018 1019 1020 1021
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
1022
        pResultInfo->length[i] = bytes;
1023 1024
      } else {
        pResultInfo->row[i] = NULL;
1025
        pResultInfo->length[i] = 0;
1026 1027 1028 1029 1030
      }
    }
  }
}

1031
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
D
dapan1121 已提交
1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042
  assert(pRequest != NULL);

  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
    // All data has returned to App already, no need to try again
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
1043
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
D
dapan1121 已提交
1044 1045 1046 1047 1048
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

L
Liu Jicong 已提交
1049 1050
    pRequest->code =
        setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
D
dapan1121 已提交
1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);

    if (pResultInfo->numOfRows == 0) {
      return NULL;
    }
  }

  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
  }

  return pResultInfo->row;
}

1072 1073 1074 1075 1076
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) {
  SRequestObj* pRequest = (SRequestObj*) param;

  // return to client
  pRequest->body.queryFp(pRequest->body.param, pRequest, code);
1077
}
D
dapan1121 已提交
1078

1079
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
L
Liu Jicong 已提交
1080
  // return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4);
1081
  assert(pRequest != NULL);
H
Haojun Liao 已提交
1082

H
Haojun Liao 已提交
1083
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
H
Haojun Liao 已提交
1084
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
H
Haojun Liao 已提交
1085
    // All data has returned to App already, no need to try again
1086 1087 1088 1089 1090
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

1091 1092 1093 1094 1095 1096 1097 1098
    SSyncQueryParam* pParam = pRequest->body.param;

    // always converted in async query: convertUcs4
    taos_fetch_rows_a(pRequest, syncFetchFn, pParam);
    tsem_wait(&pParam->sem);
  }

    /*
H
Haojun Liao 已提交
1099 1100 1101 1102 1103
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
1104

L
Liu Jicong 已提交
1105 1106
    pRequest->code =
        setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4, true);
H
Haojun Liao 已提交
1107 1108 1109 1110
    if (pRequest->code != TSDB_CODE_SUCCESS) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }
H
Haojun Liao 已提交
1111

H
Haojun Liao 已提交
1112 1113
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
H
Haojun Liao 已提交
1114

H
Haojun Liao 已提交
1115
    if (pResultInfo->numOfRows == 0) {
H
Haojun Liao 已提交
1116 1117
      return NULL;
    }
1118
     */
H
Haojun Liao 已提交
1119

1120 1121 1122
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
1123 1124 1125 1126 1127
  }

  return pResultInfo->row;
}

1128
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
1129
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
1130 1131
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
1132 1133
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1134

1135 1136 1137
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
1138
  }
1139 1140

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1141 1142
}

1143 1144 1145 1146
static char* parseTagDatatoJson(void* p) {
  char*  string = NULL;
  cJSON* json = cJSON_CreateObject();
  if (json == NULL) {
wmmhello's avatar
wmmhello 已提交
1147 1148 1149
    goto end;
  }

C
Cary Xu 已提交
1150 1151 1152 1153 1154 1155
  SArray* pTagVals = NULL;
  if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
    goto end;
  }

  int16_t nCols = taosArrayGetSize(pTagVals);
1156
  char    tagJsonKey[256] = {0};
wmmhello's avatar
wmmhello 已提交
1157
  for (int j = 0; j < nCols; ++j) {
C
Cary Xu 已提交
1158
    STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
wmmhello's avatar
wmmhello 已提交
1159 1160
    // json key  encode by binary
    memset(tagJsonKey, 0, sizeof(tagJsonKey));
wmmhello's avatar
wmmhello 已提交
1161
    memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
wmmhello's avatar
wmmhello 已提交
1162
    // json value
L
Liu Jicong 已提交
1163
    char type = pTagVal->type;
1164
    if (type == TSDB_DATA_TYPE_NULL) {
wmmhello's avatar
wmmhello 已提交
1165
      cJSON* value = cJSON_CreateNull();
1166
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1167 1168 1169
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1170
    } else if (type == TSDB_DATA_TYPE_NCHAR) {
wmmhello's avatar
wmmhello 已提交
1171
      cJSON* value = NULL;
wmmhello's avatar
wmmhello 已提交
1172 1173 1174
      if (pTagVal->nData > 0) {
        char*   tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
        int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
wmmhello's avatar
wmmhello 已提交
1175
        if (length < 0) {
L
Liu Jicong 已提交
1176 1177
          tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                   pTagVal->pData);
wmmhello's avatar
wmmhello 已提交
1178 1179 1180 1181 1182
          taosMemoryFree(tagJsonValue);
          goto end;
        }
        value = cJSON_CreateString(tagJsonValue);
        taosMemoryFree(tagJsonValue);
1183
        if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1184 1185
          goto end;
        }
wmmhello's avatar
wmmhello 已提交
1186
      } else if (pTagVal->nData == 0) {
wmmhello's avatar
wmmhello 已提交
1187
        value = cJSON_CreateString("");
1188
      } else {
wmmhello's avatar
wmmhello 已提交
1189 1190 1191 1192
        ASSERT(0);
      }

      cJSON_AddItemToObject(json, tagJsonKey, value);
1193
    } else if (type == TSDB_DATA_TYPE_DOUBLE) {
wmmhello's avatar
wmmhello 已提交
1194
      double jsonVd = *(double*)(&pTagVal->i64);
wmmhello's avatar
wmmhello 已提交
1195
      cJSON* value = cJSON_CreateNumber(jsonVd);
1196
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1197 1198 1199
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1200
    } else if (type == TSDB_DATA_TYPE_BOOL) {
wmmhello's avatar
wmmhello 已提交
1201
      char   jsonVd = *(char*)(&pTagVal->i64);
wmmhello's avatar
wmmhello 已提交
1202
      cJSON* value = cJSON_CreateBool(jsonVd);
1203
      if (value == NULL) {
wmmhello's avatar
wmmhello 已提交
1204 1205 1206
        goto end;
      }
      cJSON_AddItemToObject(json, tagJsonKey, value);
1207
    } else {
wmmhello's avatar
wmmhello 已提交
1208
      ASSERT(0);
wmmhello's avatar
wmmhello 已提交
1209 1210 1211 1212 1213 1214 1215 1216
    }
  }
  string = cJSON_PrintUnformatted(json);
end:
  cJSON_Delete(json);
  return string;
}

1217 1218 1219 1220 1221
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

1222
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);
D
stmt  
dapan1121 已提交
1237
          ASSERT((p + len) < (pResultInfo->convertBuf[i] + colLength[i]));
dengyihao's avatar
dengyihao 已提交
1238

1239 1240 1241 1242 1243 1244 1245 1246
          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1247
    } else if (type == TSDB_DATA_TYPE_JSON && colLength[i] > 0) {
1248 1249 1250 1251 1252 1253
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;
dengyihao's avatar
dengyihao 已提交
1254
      int32_t        len = 0;
1255 1256 1257 1258 1259 1260
      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t jsonInnerType = *pStart;
dengyihao's avatar
dengyihao 已提交
1261 1262 1263
          char*   jsonInnerData = pStart + CHAR_BYTES;
          char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
          if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1264 1265
            sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
            varDataSetLen(dst, strlen(varDataVal(dst)));
wmmhello's avatar
wmmhello 已提交
1266
          } else if (jsonInnerType == TD_TAG_JSON) {
1267
            char* jsonString = parseTagDatatoJson(jsonInnerData);
wmmhello's avatar
wmmhello 已提交
1268 1269
            STR_TO_VARSTR(dst, jsonString);
            taosMemoryFree(jsonString);
dengyihao's avatar
dengyihao 已提交
1270
          } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1271
            *(char*)varDataVal(dst) = '\"';
dengyihao's avatar
dengyihao 已提交
1272 1273
            int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
                                           varDataVal(dst) + CHAR_BYTES);
1274
            if (length <= 0) {
dengyihao's avatar
dengyihao 已提交
1275 1276
              tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset,
                       varDataVal(jsonInnerData));
1277 1278
              length = 0;
            }
dengyihao's avatar
dengyihao 已提交
1279
            varDataSetLen(dst, length + CHAR_BYTES * 2);
wmmhello's avatar
wmmhello 已提交
1280
            *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
dengyihao's avatar
dengyihao 已提交
1281
          } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
1282 1283 1284
            double jsonVd = *(double*)(jsonInnerData);
            sprintf(varDataVal(dst), "%.9lf", jsonVd);
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1285 1286
          } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
            sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
1287
            varDataSetLen(dst, strlen(varDataVal(dst)));
dengyihao's avatar
dengyihao 已提交
1288
          } else {
1289 1290 1291
            ASSERT(0);
          }

dengyihao's avatar
dengyihao 已提交
1292
          if (len + varDataTLen(dst) > colLength[i]) {
1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309
            p = taosMemoryRealloc(pResultInfo->convertBuf[i], len + varDataTLen(dst));
            if (p == NULL) {
              return TSDB_CODE_OUT_OF_MEMORY;
            }

            pResultInfo->convertBuf[i] = p;
          }
          p = pResultInfo->convertBuf[i] + len;
          memcpy(p, dst, varDataTLen(dst));
          pCol->offset[j] = len;
          len += varDataTLen(dst);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
1310 1311 1312 1313 1314
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
1315 1316
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
1317 1318
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
1319
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
1320 1321
  }

1322 1323 1324 1325
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
1326

L
Liu Jicong 已提交
1327
  char* p = (char*)pResultInfo->pData;
1328

L
Liu Jicong 已提交
1329
  int32_t dataLen = *(int32_t*)p;
1330 1331
  p += sizeof(int32_t);

L
Liu Jicong 已提交
1332
  uint64_t groupId = *(uint64_t*)p;
1333 1334 1335 1336 1337 1338
  p += sizeof(uint64_t);

  int32_t* colLength = (int32_t*)p;
  p += sizeof(int32_t) * numOfCols;

  char* pStart = p;
H
Haojun Liao 已提交
1339
  for (int32_t i = 0; i < numOfCols; ++i) {
1340
    colLength[i] = htonl(colLength[i]);
1341
    ASSERT(colLength[i] < dataLen);
1342 1343 1344 1345 1346 1347 1348 1349 1350 1351

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
1352
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
1353 1354 1355
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
1356
  }
1357

H
Haojun Liao 已提交
1358
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
1359 1360
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
1361 1362
  }

1363
  return code;
S
Shengliang Guan 已提交
1364 1365
}

H
Haojun Liao 已提交
1366
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
1367
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
1368
  taosThreadMutexLock(&pObj->mutex);
1369 1370 1371 1372
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
1373

wafwerar's avatar
wafwerar 已提交
1374
  taosThreadMutexUnlock(&pObj->mutex);
1375 1376 1377 1378 1379
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
1380
  taosThreadMutexLock(&pTscObj->mutex);
1381
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
1382
  taosThreadMutexUnlock(&pTscObj->mutex);
1383
}
S
Shengliang Guan 已提交
1384

H
Haojun Liao 已提交
1385 1386 1387 1388 1389 1390 1391 1392 1393 1394
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

L
Liu Jicong 已提交
1395 1396
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
                              bool freeAfterUse) {
1397 1398
  assert(pResultInfo != NULL && pRsp != NULL);

L
Liu Jicong 已提交
1399 1400
  if (freeAfterUse) taosMemoryFreeClear(pResultInfo->pRspMsg);

L
Liu Jicong 已提交
1401 1402 1403 1404 1405
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
1406
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
1407
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
1408

1409
  // TODO handle the compressed case
1410
  pResultInfo->totalRows += pResultInfo->numOfRows;
1411 1412
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
1413
}
S
Shengliang Guan 已提交
1414 1415 1416 1417 1418 1419

TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
  void*              clientRpc = NULL;
  SServerStatusRsp   statusRsp = {0};
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
S
Shengliang Guan 已提交
1420
  SRpcMsg            rpcMsg = {.info.ahandle = (void*)0x9526, .msgType = TDMT_DND_SERVER_STATUS};
S
Shengliang Guan 已提交
1421 1422 1423 1424 1425 1426 1427 1428 1429 1430 1431 1432 1433 1434 1435 1436 1437 1438
  SRpcMsg            rpcRsp = {0};
  SRpcInit           rpcInit = {0};
  char               pass[TSDB_PASSWORD_LEN + 1] = {0};

  rpcInit.label = "CHK";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = NULL;
  rpcInit.sessions = 16;
  rpcInit.connType = TAOS_CONN_CLIENT;
  rpcInit.idleTime = tsShellActivityTimer * 1000;
  rpcInit.user = "_dnd";

  clientRpc = rpcOpen(&rpcInit);
  if (clientRpc == NULL) {
    tscError("failed to init server status client");
    goto _OVER;
  }

S
Shengliang Guan 已提交
1439 1440 1441 1442 1443 1444 1445 1446
  if (fqdn == NULL) {
    fqdn = tsLocalFqdn;
  }

  if (port == 0) {
    port = tsServerPort;
  }

S
Shengliang Guan 已提交
1447 1448 1449 1450 1451 1452 1453 1454 1455 1456 1457 1458 1459 1460 1461
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
  epSet.eps[0].port = (uint16_t)port;
  rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);

  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
    tscError("failed to send server status req since %s", terrstr());
    goto _OVER;
  }

  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
    tscError("failed to parse server status rsp since %s", terrstr());
    goto _OVER;
  }

  code = statusRsp.statusCode;
S
Shengliang Guan 已提交
1462
  if (details != NULL && statusRsp.details != NULL) {
S
Shengliang Guan 已提交
1463 1464 1465 1466 1467 1468 1469 1470 1471 1472 1473 1474
    tstrncpy(details, statusRsp.details, maxlen);
  }

_OVER:
  if (clientRpc != NULL) {
    rpcClose(clientRpc);
  }
  if (rpcRsp.pCont != NULL) {
    rpcFreeCont(rpcRsp.pCont);
  }
  return code;
}