clientImpl.c 27.4 KB
Newer Older
1

2
#include "clientInt.h"
3
#include "clientLog.h"
4
#include "command.h"
5
#include "scheduler.h"
H
Haojun Liao 已提交
6
#include "tdatablock.h"
7
#include "tdef.h"
8
#include "tglobal.h"
9
#include "tmsgtype.h"
H
Haojun Liao 已提交
10
#include "tpagedbuf.h"
11
#include "tref.h"
X
Xiaoyu Wang 已提交
12

S
Shengliang Guan 已提交
13
static int32_t       initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
dengyihao's avatar
dengyihao 已提交
14 15
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void          destroySendMsgInfo(SMsgSendInfo* pMsgBody);
H
Haojun Liao 已提交
16

17
static bool stringLengthCheck(const char* str, size_t maxsize) {
18 19 20 21 22 23 24 25 26 27 28 29
  if (str == NULL) {
    return false;
  }

  size_t len = strlen(str);
  if (len <= 0 || len > maxsize) {
    return false;
  }

  return true;
}

dengyihao's avatar
dengyihao 已提交
30
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
31

dengyihao's avatar
dengyihao 已提交
32
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
33

dengyihao's avatar
dengyihao 已提交
34
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
35

36 37 38 39 40 41
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
  char key[512] = {0};
  snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
  return strdup(key);
}

dengyihao's avatar
dengyihao 已提交
42 43
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                                SAppInstInfo* pAppInfo);
44

dengyihao's avatar
dengyihao 已提交
45 46
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
                            uint16_t port) {
47 48 49 50
  if (taos_init() != TSDB_CODE_SUCCESS) {
    return NULL;
  }

51 52 53 54 55
  if (!validateUserName(user)) {
    terrno = TSDB_CODE_TSC_INVALID_USER_LENGTH;
    return NULL;
  }

56
  char localDb[TSDB_DB_NAME_LEN] = {0};
H
Haojun Liao 已提交
57
  if (db != NULL && strlen(db) > 0) {
dengyihao's avatar
dengyihao 已提交
58
    if (!validateDbName(db)) {
59 60 61 62
      terrno = TSDB_CODE_TSC_INVALID_DB_LENGTH;
      return NULL;
    }

63 64
    tstrncpy(localDb, db, sizeof(localDb));
    strdequote(localDb);
65 66
  }

67
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
68 69 70 71 72 73
  if (auth == NULL) {
    if (!validatePassword(pass)) {
      terrno = TSDB_CODE_TSC_INVALID_PASS_LENGTH;
      return NULL;
    }

dengyihao's avatar
dengyihao 已提交
74
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
75 76 77 78
  } else {
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
  }

79
  SCorEpSet epSet = {0};
S
Shengliang Guan 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92
  if (ip) {
    if (initEpSetFromCfg(ip, NULL, &epSet) < 0) {
      return NULL;
    }

    if (port) {
      epSet.epSet.eps[0].port = port;
    }
  } else {
    if (initEpSetFromCfg(tsFirst, tsSecond, &epSet) < 0) {
      return NULL;
    }
  }
93

dengyihao's avatar
dengyihao 已提交
94
  char*          key = getClusterKey(user, secretEncrypt, ip, port);
H
Haojun Liao 已提交
95
  SAppInstInfo** pInst = NULL;
96

wafwerar's avatar
wafwerar 已提交
97
  taosThreadMutexLock(&appInfo.mutex);
H
Haojun Liao 已提交
98 99

  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
L
Liu Jicong 已提交
100
  SAppInstInfo* p = NULL;
101
  if (pInst == NULL) {
wafwerar's avatar
wafwerar 已提交
102
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
dengyihao's avatar
dengyihao 已提交
103
    p->mgmtEp = epSet;
H
Haojun Liao 已提交
104
    p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
D
dapan 已提交
105
    p->pAppHbMgr = appHbMgrInit(p, key);
H
Haojun Liao 已提交
106
    taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
107

H
Haojun Liao 已提交
108
    pInst = &p;
109 110
  }

wafwerar's avatar
wafwerar 已提交
111
  taosThreadMutexUnlock(&appInfo.mutex);
H
Haojun Liao 已提交
112

wafwerar's avatar
wafwerar 已提交
113
  taosMemoryFreeClear(key);
H
Haojun Liao 已提交
114
  return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst);
115 116
}

dengyihao's avatar
dengyihao 已提交
117
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest) {
X
Xiaoyu Wang 已提交
118 119
  *pRequest = createRequest(pTscObj, NULL, NULL, TSDB_SQL_SELECT);
  if (*pRequest == NULL) {
120
    tscError("failed to malloc sqlObj");
X
Xiaoyu Wang 已提交
121
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
122 123
  }

wafwerar's avatar
wafwerar 已提交
124
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
X
Xiaoyu Wang 已提交
125
  if ((*pRequest)->sqlstr == NULL) {
dengyihao's avatar
dengyihao 已提交
126
    tscError("0x%" PRIx64 " failed to prepare sql string buffer", (*pRequest)->self);
X
Xiaoyu Wang 已提交
127 128
    (*pRequest)->msgBuf = strdup("failed to prepare sql string buffer");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
129 130
  }

X
Xiaoyu Wang 已提交
131 132 133
  strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
  (*pRequest)->sqlstr[sqlLen] = 0;
  (*pRequest)->sqlLen = sqlLen;
134

D
dapan1121 已提交
135 136 137 138 139 140 141
  if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self, sizeof((*pRequest)->self))) {
    destroyRequest(*pRequest);
    *pRequest = NULL;
    tscError("put request to request hash failed");
    return TSDB_CODE_TSC_OUT_OF_MEMORY;
  }

dengyihao's avatar
dengyihao 已提交
142
  tscDebugL("0x%" PRIx64 " SQL: %s, reqId:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
X
Xiaoyu Wang 已提交
143 144
  return TSDB_CODE_SUCCESS;
}
145

D
stmt  
dapan1121 已提交
146
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
H
Haojun Liao 已提交
147 148
  STscObj* pTscObj = pRequest->pTscObj;

X
Xiaoyu Wang 已提交
149
  SParseContext cxt = {
dengyihao's avatar
dengyihao 已提交
150 151
      .requestId = pRequest->requestId,
      .acctId = pTscObj->acctId,
X
Xiaoyu Wang 已提交
152
      .db = pRequest->pDb,
X
Xiaoyu Wang 已提交
153
      .topicQuery = topicQuery,
dengyihao's avatar
dengyihao 已提交
154 155 156 157 158
      .pSql = pRequest->sqlstr,
      .sqlLen = pRequest->sqlLen,
      .pMsg = pRequest->msgBuf,
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
      .pTransporter = pTscObj->pAppInfo->pTransporter,
D
stmt  
dapan1121 已提交
159
      .pStmtCb = pStmtCb,
X
Xiaoyu Wang 已提交
160
  };
H
Haojun Liao 已提交
161

H
Haojun Liao 已提交
162 163
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
H
Haojun Liao 已提交
164 165 166 167 168
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  code = qParseQuerySql(&cxt, pQuery);
X
Xiaoyu Wang 已提交
169 170 171 172
  if (TSDB_CODE_SUCCESS == code) {
    if ((*pQuery)->haveResultSet) {
      setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
    }
H
Haojun Liao 已提交
173

X
Xiaoyu Wang 已提交
174 175
    TSWAP(pRequest->dbList, (*pQuery)->pDbList, SArray*);
    TSWAP(pRequest->tableList, (*pQuery)->pTableList, SArray*);
X
Xiaoyu Wang 已提交
176
  }
177

X
Xiaoyu Wang 已提交
178 179
  return code;
}
H
Haojun Liao 已提交
180

181 182
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
  SRetrieveTableRsp* pRsp = NULL;
L
Liu Jicong 已提交
183
  int32_t            code = qExecCommand(pQuery->pRoot, &pRsp);
184
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
185
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
186 187 188 189
  }
  return code;
}

X
Xiaoyu Wang 已提交
190
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
X
Xiaoyu Wang 已提交
191 192 193 194 195
  // drop table if exists not_exists_table
  if (NULL == pQuery->pCmdMsg) {
    return TSDB_CODE_SUCCESS;
  }

196 197 198
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
  pRequest->type = pMsgInfo->msgType;
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
L
Liu Jicong 已提交
199
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
200 201 202

  STscObj*      pTscObj = pRequest->pTscObj;
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
203 204 205 206 207 208 209 210

  if (pMsgInfo->msgType == TDMT_VND_SHOW_TABLES) {
    SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
    if (pShowReqInfo->pArray == NULL) {
      pShowReqInfo->currentIndex = 0;  // set the first vnode/ then iterate the next vnode
      pShowReqInfo->pArray = pMsgInfo->pExtension;
    }
  }
211 212
  int64_t transporterId = 0;
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, &transporterId, pSendMsg);
X
Xiaoyu Wang 已提交
213

214
  tsem_wait(&pRequest->body.rspSem);
X
Xiaoyu Wang 已提交
215 216
  return TSDB_CODE_SUCCESS;
}
X
Xiaoyu Wang 已提交
217

218 219
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
  pRequest->type = pQuery->msgType;
L
Liu Jicong 已提交
220 221 222 223 224 225
  SPlanContext cxt = {.queryId = pRequest->requestId,
                      .acctId = pRequest->pTscObj->acctId,
                      .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
                      .pAstRoot = pQuery->pRoot,
                      .showRewrite = pQuery->showRewrite};
  int32_t      code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
X
Xiaoyu Wang 已提交
226 227 228 229
  if (code != 0) {
    return code;
  }
  return code;
X
Xiaoyu Wang 已提交
230 231
}

232 233
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
  assert(pSchema != NULL && numOfCols > 0);
234

235
  pResInfo->numOfCols = numOfCols;
236 237
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
238 239

  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
240
    pResInfo->fields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
241
    pResInfo->fields[i].type = pSchema[i].type;
242 243

    pResInfo->userFields[i].bytes = pSchema[i].bytes;
L
Liu Jicong 已提交
244
    pResInfo->userFields[i].type = pSchema[i].type;
245 246 247 248 249 250 251

    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR) {
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
    }

252
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
253
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
254
  }
X
Xiaoyu Wang 已提交
255 256
}

X
Xiaoyu Wang 已提交
257
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
258
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
259

D
dapan1121 已提交
260
  SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
L
Liu Jicong 已提交
261 262
  int32_t      code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
                                       pRequest->metric.start, &res);
D
dapan1121 已提交
263
  if (code != TSDB_CODE_SUCCESS) {
D
dapan1121 已提交
264 265
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
266 267
    }

D
dapan1121 已提交
268
    pRequest->code = code;
D
dapan1121 已提交
269
    terrno = code;
H
Haojun Liao 已提交
270
    return pRequest->code;
X
Xiaoyu Wang 已提交
271
  }
272

273
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
D
dapan1121 已提交
274
    pRequest->body.resInfo.numOfRows = res.numOfRows;
dengyihao's avatar
test  
dengyihao 已提交
275

D
dapan1121 已提交
276 277
    if (pRequest->body.queryJob != 0) {
      schedulerFreeJob(pRequest->body.queryJob);
D
dapan1121 已提交
278 279
    }
  }
D
dapan1121 已提交
280

D
dapan1121 已提交
281
  pRequest->code = res.code;
L
Liu Jicong 已提交
282
  terrno = res.code;
D
dapan1121 已提交
283
  return pRequest->code;
X
Xiaoyu Wang 已提交
284
}
X
Xiaoyu Wang 已提交
285

D
dapan1121 已提交
286
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
S
Shengliang Guan 已提交
287
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
288 289
  SQuery*      pQuery = NULL;
  SArray*      pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
X
Xiaoyu Wang 已提交
290

X
Xiaoyu Wang 已提交
291 292
  int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
  if (TSDB_CODE_SUCCESS == code) {
D
stmt  
dapan1121 已提交
293
    code = parseSql(pRequest, false, &pQuery, NULL);
X
Xiaoyu Wang 已提交
294
  }
H
Haojun Liao 已提交
295

X
Xiaoyu Wang 已提交
296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315
  if (TSDB_CODE_SUCCESS == code) {
    switch (pQuery->execMode) {
      case QUERY_EXEC_MODE_LOCAL:
        code = execLocalCmd(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_RPC:
        code = execDdlQuery(pRequest, pQuery);
        break;
      case QUERY_EXEC_MODE_SCHEDULE:
        code = getPlan(pRequest, pQuery, &pRequest->body.pDag, pNodeList);
        if (TSDB_CODE_SUCCESS == code) {
          code = scheduleQuery(pRequest, pRequest->body.pDag, pNodeList);
        }
        break;
      case QUERY_EXEC_MODE_EMPTY_RESULT:
        pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
        break;
      default:
        break;
    }
X
Xiaoyu Wang 已提交
316 317
  }

318
  taosArrayDestroy(pNodeList);
X
Xiaoyu Wang 已提交
319
  qDestroyQuery(pQuery);
D
dapan1121 已提交
320
  if (NULL != pRequest && TSDB_CODE_SUCCESS != code) {
X
Xiaoyu Wang 已提交
321 322
    pRequest->code = terrno;
  }
323

324 325 326
  return pRequest;
}

D
dapan1121 已提交
327
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
L
Liu Jicong 已提交
328 329 330 331
  SCatalog* pCatalog = NULL;
  int32_t   code = 0;
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
D
dapan1121 已提交
332 333 334 335

  if (dbNum <= 0 && tblNum <= 0) {
    return TSDB_CODE_QRY_APP_ERROR;
  }
L
Liu Jicong 已提交
336

D
dapan1121 已提交
337 338 339 340 341 342 343 344
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }

  SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);

  for (int32_t i = 0; i < dbNum; ++i) {
L
Liu Jicong 已提交
345 346
    char* dbFName = taosArrayGet(pRequest->dbList, i);

D
dapan1121 已提交
347 348 349
    code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
D
dapan1121 已提交
350 351 352
    }
  }

D
dapan1121 已提交
353
  for (int32_t i = 0; i < tblNum; ++i) {
L
Liu Jicong 已提交
354
    SName* tableName = taosArrayGet(pRequest->tableList, i);
D
dapan1121 已提交
355 356 357 358 359

    code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
    if (code != TSDB_CODE_SUCCESS) {
      return code;
    }
D
dapan1121 已提交
360 361
  }

D
dapan1121 已提交
362
  return code;
D
dapan1121 已提交
363 364 365 366
}

SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
  SRequestObj* pRequest = NULL;
L
Liu Jicong 已提交
367 368
  int32_t      retryNum = 0;
  int32_t      code = 0;
D
dapan1121 已提交
369

D
dapan1121 已提交
370
  while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
D
dapan1121 已提交
371
    pRequest = execQueryImpl(pTscObj, sql, sqlLen);
D
dapan1121 已提交
372
    if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
D
dapan1121 已提交
373 374 375
      break;
    }

D
dapan1121 已提交
376 377 378
    code = refreshMeta(pTscObj, pRequest);
    if (code) {
      pRequest->code = code;
D
dapan1121 已提交
379 380
      break;
    }
D
dapan1121 已提交
381 382

    destroyRequest(pRequest);
D
dapan1121 已提交
383
  }
L
Liu Jicong 已提交
384

D
dapan1121 已提交
385 386 387 388 389 390 391 392 393 394 395 396 397 398
  return pRequest;
}

TAOS_RES* taos_query_l(TAOS* taos, const char* sql, int sqlLen) {
  STscObj* pTscObj = (STscObj*)taos;
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
    return NULL;
  }

  return execQuery(pTscObj, sql, sqlLen);
}

S
Shengliang Guan 已提交
399 400
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
  pEpSet->version = 0;
401

H
Haojun Liao 已提交
402
  // init mnode ip set
dengyihao's avatar
dengyihao 已提交
403
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
404
  mgmtEpSet->numOfEps = 0;
dengyihao's avatar
dengyihao 已提交
405
  mgmtEpSet->inUse = 0;
406

S
Shengliang Guan 已提交
407 408 409 410
  if (firstEp && firstEp[0] != 0) {
    if (strlen(firstEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
S
Shengliang Guan 已提交
411
    }
S
Shengliang Guan 已提交
412 413 414 415 416 417 418 419 420

    taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[0]);
    mgmtEpSet->numOfEps++;
  }

  if (secondEp && secondEp[0] != 0) {
    if (strlen(secondEp) >= TSDB_EP_LEN) {
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
      return -1;
421
    }
S
Shengliang Guan 已提交
422 423 424

    taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
    mgmtEpSet->numOfEps++;
425 426 427 428 429 430 431 432 433 434
  }

  if (mgmtEpSet->numOfEps == 0) {
    terrno = TSDB_CODE_TSC_INVALID_FQDN;
    return -1;
  }

  return 0;
}

dengyihao's avatar
dengyihao 已提交
435 436 437
STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
                         SAppInstInfo* pAppInfo) {
  STscObj* pTscObj = createTscObj(user, auth, db, pAppInfo);
438 439 440 441 442
  if (NULL == pTscObj) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return pTscObj;
  }

dengyihao's avatar
dengyihao 已提交
443
  SRequestObj* pRequest = createRequest(pTscObj, fp, param, TDMT_MND_CONNECT);
444 445 446
  if (pRequest == NULL) {
    destroyTscObj(pTscObj);
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
447 448 449
    return NULL;
  }

450
  SMsgSendInfo* body = buildConnectMsg(pRequest);
451 452

  int64_t transporterId = 0;
H
Haojun Liao 已提交
453
  asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
454 455 456

  tsem_wait(&pRequest->body.rspSem);
  if (pRequest->code != TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
457 458
    const char* errorMsg =
        (pRequest->code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
459 460 461 462 463 464
    printf("failed to connect to server, reason: %s\n\n", errorMsg);

    destroyRequest(pRequest);
    taos_close(pTscObj);
    pTscObj = NULL;
  } else {
D
dapan1121 已提交
465
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, reqId:0x%" PRIx64, pTscObj->id,
dengyihao's avatar
dengyihao 已提交
466
             pTscObj->connId, pTscObj->pAppInfo->pTransporter, pRequest->requestId);
467 468 469 470 471 472
    destroyRequest(pRequest);
  }

  return pTscObj;
}

dengyihao's avatar
dengyihao 已提交
473
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
wafwerar's avatar
wafwerar 已提交
474
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
475 476 477 478 479
  if (pMsgSendInfo == NULL) {
    terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
    return NULL;
  }

dengyihao's avatar
dengyihao 已提交
480
  pMsgSendInfo->msgType = TDMT_MND_CONNECT;
481

482
  pMsgSendInfo->requestObjRefId = pRequest->self;
dengyihao's avatar
dengyihao 已提交
483 484 485
  pMsgSendInfo->requestId = pRequest->requestId;
  pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
  pMsgSendInfo->param = pRequest;
486

S
Shengliang Guan 已提交
487 488
  SConnectReq connectReq = {0};
  STscObj*    pObj = pRequest->pTscObj;
489

H
Haojun Liao 已提交
490
  char* db = getDbOfConnection(pObj);
491
  if (db != NULL) {
S
Shengliang Guan 已提交
492
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
493
  }
wafwerar's avatar
wafwerar 已提交
494
  taosMemoryFreeClear(db);
495

S
Shengliang Guan 已提交
496 497 498 499 500
  connectReq.pid = htonl(appInfo.pid);
  connectReq.startTime = htobe64(appInfo.startTime);
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));

  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
wafwerar's avatar
wafwerar 已提交
501
  void*   pReq = taosMemoryMalloc(contLen);
S
Shengliang Guan 已提交
502
  tSerializeSConnectReq(pReq, contLen, &connectReq);
503

S
Shengliang Guan 已提交
504 505
  pMsgSendInfo->msgInfo.len = contLen;
  pMsgSendInfo->msgInfo.pData = pReq;
506
  return pMsgSendInfo;
507 508
}

509
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
H
Haojun Liao 已提交
510
  assert(pMsgBody != NULL);
wafwerar's avatar
wafwerar 已提交
511 512
  taosMemoryFreeClear(pMsgBody->msgInfo.pData);
  taosMemoryFreeClear(pMsgBody);
H
Haojun Liao 已提交
513
}
514

dengyihao's avatar
dengyihao 已提交
515
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
L
Liu Jicong 已提交
516 517
  return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP ||
         msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
dengyihao's avatar
dengyihao 已提交
518
}
519

520
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
dengyihao's avatar
dengyihao 已提交
521
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->ahandle;
522
  assert(pMsg->ahandle != NULL);
523

524
  if (pSendInfo->requestObjRefId != 0) {
dengyihao's avatar
dengyihao 已提交
525
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
526
    assert(pRequest->self == pSendInfo->requestObjRefId);
527

D
dapan1121 已提交
528
    pRequest->metric.rsp = taosGetTimestampUs();
529

dengyihao's avatar
dengyihao 已提交
530
    STscObj* pTscObj = pRequest->pTscObj;
531 532 533 534
    if (pEpSet) {
      if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
        updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
      }
535 536
    }

537
    /*
dengyihao's avatar
dengyihao 已提交
538 539
     * There is not response callback function for submit response.
     * The actual inserted number of points is the first number.
540
     */
541
    int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
542
    if (pMsg->code == TSDB_CODE_SUCCESS) {
dengyihao's avatar
dengyihao 已提交
543
      tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
544
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
545
    } else {
dengyihao's avatar
dengyihao 已提交
546
      tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
L
Liu Jicong 已提交
547
               TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
548
    }
549

H
Haojun Liao 已提交
550
    taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
551 552
  }

dengyihao's avatar
dengyihao 已提交
553
  SDataBuf buf = {.len = pMsg->contLen, .pData = NULL, .handle = pMsg->handle};
554 555

  if (pMsg->contLen > 0) {
wafwerar's avatar
wafwerar 已提交
556
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
557 558 559 560 561 562
    if (buf.pData == NULL) {
      terrno = TSDB_CODE_OUT_OF_MEMORY;
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
    } else {
      memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
    }
563 564
  }

565
  pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
566
  rpcFreeCont(pMsg->pCont);
567
  destroySendMsgInfo(pSendInfo);
568
}
569

dengyihao's avatar
dengyihao 已提交
570
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
571 572 573 574 575 576 577 578 579 580 581 582 583
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
  if (user == NULL) {
    user = TSDB_DEFAULT_USER;
  }

  if (auth == NULL) {
    tscError("No auth info is given, failed to connect to server");
    return NULL;
  }

  return taos_connect_internal(ip, user, NULL, auth, db, port);
}

dengyihao's avatar
dengyihao 已提交
584 585 586
TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
                     const char* db, int dbLen, uint16_t port) {
  char ipStr[TSDB_EP_LEN] = {0};
587
  char dbStr[TSDB_DB_NAME_LEN] = {0};
dengyihao's avatar
dengyihao 已提交
588 589
  char userStr[TSDB_USER_LEN] = {0};
  char passStr[TSDB_PASSWORD_LEN] = {0};
590

dengyihao's avatar
dengyihao 已提交
591
  strncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
dengyihao's avatar
dengyihao 已提交
592 593
  strncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
  strncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
dengyihao's avatar
dengyihao 已提交
594
  strncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
595
  return taos_connect(ipStr, userStr, passStr, dbStr, port);
596 597
}

L
Liu Jicong 已提交
598
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
    SResultColumn* pCol = &pResultInfo->pCol[i];

    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

    if (IS_VAR_DATA_TYPE(type)) {
      if (pCol->offset[pResultInfo->current] != -1) {
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;

        pResultInfo->length[i] = varDataLen(pStart);
        pResultInfo->row[i] = varDataVal(pStart);
      } else {
        pResultInfo->row[i] = NULL;
      }
    } else {
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
      } else {
        pResultInfo->row[i] = NULL;
      }
    }
  }
}

624
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
625
  assert(pRequest != NULL);
626
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
627

H
Haojun Liao 已提交
628 629
  SEpSet epSet = {0};

H
Haojun Liao 已提交
630
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
631
    if (pRequest->type == TDMT_VND_QUERY) {
632 633
      // All data has returned to App already, no need to try again
      if (pResultInfo->completed) {
634
        pResultInfo->numOfRows = 0;
635 636 637
        return NULL;
      }

638
      SReqResultInfo* pResInfo = &pRequest->body.resInfo;
639 640
      pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
      if (pRequest->code != TSDB_CODE_SUCCESS) {
641
        pResultInfo->numOfRows = 0;
642 643 644
        return NULL;
      }

645
      pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
646
      if (pRequest->code != TSDB_CODE_SUCCESS) {
647
        pResultInfo->numOfRows = 0;
648 649
        return NULL;
      }
H
Haojun Liao 已提交
650

dengyihao's avatar
dengyihao 已提交
651 652
      tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64,
               pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
653

654
      if (pResultInfo->numOfRows == 0) {
D
dapan1121 已提交
655 656 657 658
        return NULL;
      }

      goto _return;
659
    } else if (pRequest->type == TDMT_MND_SHOW) {
H
Haojun Liao 已提交
660
      pRequest->type = TDMT_MND_SHOW_RETRIEVE;
H
Haojun Liao 已提交
661
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
H
Haojun Liao 已提交
662 663
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
H
Haojun Liao 已提交
664
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
dengyihao's avatar
dengyihao 已提交
665
      SVgroupInfo*  pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
H
Haojun Liao 已提交
666

L
Liu Jicong 已提交
667
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
668 669 670 671 672 673 674 675
    } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
      pRequest->type = TDMT_VND_SHOW_TABLES;
      SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
      pShowReqInfo->currentIndex += 1;
      if (pShowReqInfo->currentIndex >= taosArrayGetSize(pShowReqInfo->pArray)) {
        return NULL;
      }

dengyihao's avatar
dengyihao 已提交
676
      SVgroupInfo*     pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
wafwerar's avatar
wafwerar 已提交
677
      SVShowTablesReq* pShowReq = taosMemoryCalloc(1, sizeof(SVShowTablesReq));
H
Haojun Liao 已提交
678 679 680 681 682 683
      pShowReq->head.vgId = htonl(pVgroupInfo->vgId);

      pRequest->body.requestMsg.len = sizeof(SVShowTablesReq);
      pRequest->body.requestMsg.pData = pShowReq;

      SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
L
Liu Jicong 已提交
684
      epSet = pVgroupInfo->epSet;
H
Haojun Liao 已提交
685

H
Haojun Liao 已提交
686
      int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
687
      STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
688
      asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
689 690 691
      tsem_wait(&pRequest->body.rspSem);

      pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
dengyihao's avatar
dengyihao 已提交
692
    } else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
D
dapan1121 已提交
693
      epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
dengyihao's avatar
dengyihao 已提交
694

D
dapan1121 已提交
695 696 697
      if (pResultInfo->completed) {
        return NULL;
      }
H
Haojun Liao 已提交
698
    }
H
Haojun Liao 已提交
699

700 701 702 703 704
    if (pResultInfo->completed) {
      pResultInfo->numOfRows = 0;
      return NULL;
    }

705
    SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
H
Haojun Liao 已提交
706

707
    int64_t  transporterId = 0;
dengyihao's avatar
dengyihao 已提交
708
    STscObj* pTscObj = pRequest->pTscObj;
H
Haojun Liao 已提交
709
    asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
H
Haojun Liao 已提交
710 711 712 713 714 715 716 717 718

    tsem_wait(&pRequest->body.rspSem);

    pResultInfo->current = 0;
    if (pResultInfo->numOfRows <= pResultInfo->current) {
      return NULL;
    }
  }

D
dapan1121 已提交
719
_return:
720 721 722
  if (setupOneRowPtr) {
    doSetOneRowPtr(pResultInfo);
    pResultInfo->current += 1;
H
Haojun Liao 已提交
723 724 725 726 727
  }

  return pResultInfo->row;
}

728
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
H
Haojun Liao 已提交
729
  if (pResInfo->row == NULL) {
L
Liu Jicong 已提交
730 731
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
wafwerar's avatar
wafwerar 已提交
732 733
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
734

735 736 737
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
      return TSDB_CODE_OUT_OF_MEMORY;
    }
738
  }
739 740

  return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
741 742
}

743 744 745 746 747
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int32_t numOfCols, int32_t* colLength) {
  for (int32_t i = 0; i < numOfCols; ++i) {
    int32_t type = pResultInfo->fields[i].type;
    int32_t bytes = pResultInfo->fields[i].bytes;

748
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
      if (p == NULL) {
        return TSDB_CODE_OUT_OF_MEMORY;
      }

      pResultInfo->convertBuf[i] = p;

      SResultColumn* pCol = &pResultInfo->pCol[i];
      for (int32_t j = 0; j < numOfRows; ++j) {
        if (pCol->offset[j] != -1) {
          char* pStart = pCol->offset[j] + pCol->pData;

          int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
          ASSERT(len <= bytes);

          varDataSetLen(p, len);
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
          p += (len + VARSTR_HEADER_SIZE);
        }
      }

      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
    }
  }

  return TSDB_CODE_SUCCESS;
}

L
Liu Jicong 已提交
778 779
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
                         bool convertUcs4) {
H
Haojun Liao 已提交
780 781
  assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
  if (numOfRows == 0) {
782
    return TSDB_CODE_SUCCESS;
H
Haojun Liao 已提交
783 784
  }

785 786 787 788
  int32_t code = doPrepareResPtr(pResultInfo);
  if (code != TSDB_CODE_SUCCESS) {
    return code;
  }
H
Haojun Liao 已提交
789

790 791
  int32_t* colLength = (int32_t*)pResultInfo->pData;
  char*    pStart = ((char*)pResultInfo->pData) + sizeof(int32_t) * numOfCols;
H
Haojun Liao 已提交
792
  for (int32_t i = 0; i < numOfCols; ++i) {
793 794 795 796 797 798 799 800 801 802 803
    colLength[i] = htonl(colLength[i]);

    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
      pStart += numOfRows * sizeof(int32_t);
    } else {
      pResultInfo->pCol[i].nullbitmap = pStart;
      pStart += BitmapLen(pResultInfo->numOfRows);
    }

    pResultInfo->pCol[i].pData = pStart;
H
Haojun Liao 已提交
804
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
805 806 807
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;

    pStart += colLength[i];
808
  }
809

H
Haojun Liao 已提交
810
  // convert UCS4-LE encoded character to native multi-bytes character in current data block.
811 812
  if (convertUcs4) {
    code = doConvertUCS4(pResultInfo, numOfRows, numOfCols, colLength);
813 814
  }

815
  return code;
S
Shengliang Guan 已提交
816 817
}

H
Haojun Liao 已提交
818
char* getDbOfConnection(STscObj* pObj) {
dengyihao's avatar
dengyihao 已提交
819
  char* p = NULL;
wafwerar's avatar
wafwerar 已提交
820
  taosThreadMutexLock(&pObj->mutex);
821 822 823 824
  size_t len = strlen(pObj->db);
  if (len > 0) {
    p = strndup(pObj->db, tListLen(pObj->db));
  }
S
Shengliang Guan 已提交
825

wafwerar's avatar
wafwerar 已提交
826
  taosThreadMutexUnlock(&pObj->mutex);
827 828 829 830 831
  return p;
}

void setConnectionDB(STscObj* pTscObj, const char* db) {
  assert(db != NULL && pTscObj != NULL);
wafwerar's avatar
wafwerar 已提交
832
  taosThreadMutexLock(&pTscObj->mutex);
833
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
wafwerar's avatar
wafwerar 已提交
834
  taosThreadMutexUnlock(&pTscObj->mutex);
835
}
S
Shengliang Guan 已提交
836

H
Haojun Liao 已提交
837 838 839 840 841 842 843 844 845 846
void resetConnectDB(STscObj* pTscObj) {
  if (pTscObj == NULL) {
    return;
  }

  taosThreadMutexLock(&pTscObj->mutex);
  pTscObj->db[0] = 0;
  taosThreadMutexUnlock(&pTscObj->mutex);
}

847
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
848 849
  assert(pResultInfo != NULL && pRsp != NULL);

L
Liu Jicong 已提交
850 851 852 853 854
  pResultInfo->pRspMsg = (const char*)pRsp;
  pResultInfo->pData = (void*)pRsp->data;
  pResultInfo->numOfRows = htonl(pRsp->numOfRows);
  pResultInfo->current = 0;
  pResultInfo->completed = (pRsp->completed == 1);
855
  pResultInfo->payloadLen = htonl(pRsp->compLen);
L
Liu Jicong 已提交
856
  pResultInfo->precision = pRsp->precision;
H
Haojun Liao 已提交
857

858
  // TODO handle the compressed case
859
  pResultInfo->totalRows += pResultInfo->numOfRows;
L
Liu Jicong 已提交
860 861
  return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
                          convertUcs4);
L
fix  
Liu Jicong 已提交
862
}